You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/11 23:42:55 UTC
svn commit: r1182103 - in /incubator/hama/branches/HamaV2/server/src/main:
java/org/apache/hama/bsp/BSPTaskLauncher.java
java/org/apache/hama/bsp/JobImpl.java
java/org/apache/hama/bsp/YARNBSPJob.java resources/
resources/log4j.properties
Author: tjungblut
Date: Tue Oct 11 21:42:55 2011
New Revision: 1182103
URL: http://svn.apache.org/viewvc?rev=1182103&view=rev
Log:
Added log4j properties to log outputs and changed the way tasks gets submitted
Added:
incubator/hama/branches/HamaV2/server/src/main/resources/
incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties (with props)
Modified:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct 11 21:42:55 2011
@@ -17,7 +17,7 @@
*/
package org.apache.hama.bsp;
-import java.net.InetSocketAddress;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Callable;
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@@ -37,12 +36,12 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -52,30 +51,23 @@ public class BSPTaskLauncher implements
private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
private final Container allocatedContainer;
+ private final int id;
private final ContainerManager cm;
- private final Path jobFile;
- private final String user;
private final Configuration conf;
- private final int id;
+ private final String user;
+ private final Path jobFile;
private final BSPJobID jobId;
- public BSPTaskLauncher(BSPJobID jobId, int id, Container container,
- Configuration conf, YarnRPC rpc, Path jobFile) throws YarnRemoteException {
+ public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+ Configuration conf, Path jobFile, BSPJobID jobId)
+ throws YarnRemoteException {
this.id = id;
- this.jobFile = jobFile;
- this.user = conf.get("user.name");
+ this.cm = cm;
this.conf = conf;
- this.jobId = jobId;
this.allocatedContainer = container;
- // Connect to ContainerManager on the allocated container
- String cmIpPortStr = container.getNodeId().getHost() + ":"
- + container.getNodeId().getPort();
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
- cm = (ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress,
- conf);
- LOG.info("Spawned task with id: " + this.id
- + " for allocated container id: "
- + this.allocatedContainer.getId().toString());
+ this.jobFile = jobFile;
+ this.jobId = jobId;
+ this.user = conf.get("bsp.user.name");
}
@Override
@@ -92,6 +84,25 @@ public class BSPTaskLauncher implements
@Override
public BSPTaskStatus call() throws Exception {
+ LOG.info("Spawned task with id: " + this.id
+ + " for allocated container id: "
+ + this.allocatedContainer.getId().toString());
+ final GetContainerStatusRequest statusRequest = setupContainer(allocatedContainer, cm, user, id);
+
+ ContainerStatus lastStatus;
+ while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
+ .getState() != ContainerState.COMPLETE) {
+ Thread.sleep(1000l);
+ }
+
+ return new BSPTaskStatus(id, lastStatus.getExitStatus());
+ }
+
+ private GetContainerStatusRequest setupContainer(
+ Container allocatedContainer, ContainerManager cm, String user, int id)
+ throws IOException {
+ LOG.info("Setting up a container for user " + user + " with id of " + id
+ + " and containerID of " + allocatedContainer.getId());
// Now we setup a ContainerLaunchContext
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
@@ -113,11 +124,12 @@ public class BSPTaskLauncher implements
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
- ctx.setCommands(Arrays.asList("${JAVA_HOME}" + "/bin/java -cp './package/*' ",
- BSPTaskLauncher.class.getCanonicalName(), jobId.getJtIdentifier(), id
- + "", this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(),
- " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
+ ctx.setCommands(Arrays.asList("${JAVA_HOME}"
+ + "/bin/java -cp './package/*' ", BSPTaskLauncher.class
+ .getCanonicalName(), jobId.getJtIdentifier(), id + "", this.jobFile
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(), " 1>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
ctx.setLocalResources(Collections.singletonMap("package", packageResource));
StartContainerRequest startReq = Records
@@ -128,13 +140,7 @@ public class BSPTaskLauncher implements
GetContainerStatusRequest statusReq = Records
.newRecord(GetContainerStatusRequest.class);
statusReq.setContainerId(allocatedContainer.getId());
-
- while (cm.getContainerStatus(statusReq).getStatus().getState() != ContainerState.COMPLETE) {
- Thread.sleep(1000l);
- }
-
- return new BSPTaskStatus(id, cm.getContainerStatus(statusReq).getStatus()
- .getExitStatus());
+ return statusReq;
}
public static class BSPTaskStatus {
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java Tue Oct 11 21:42:55 2011
@@ -17,6 +17,7 @@
*/
package org.apache.hama.bsp;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,7 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
@@ -135,20 +138,34 @@ public class JobImpl implements Job {
@Override
public JobState startJob() throws YarnRemoteException, InterruptedException,
ExecutionException {
-
- AllocateRequest req = BuilderUtils.newAllocateRequest(
- appAttemptId, lastResponseID, 0.0f,createBSPTaskRequest(getTotalBSPTasks(),
- taskMemoryInMb, priority), releasedContainers);
-
- AllocateResponse allocateResponse = resourceManager.allocate(req);
- AMResponse amResponse = allocateResponse.getAMResponse();
- LOG.info("Got response! ID: " + amResponse.getResponseId()
- + " with num of containers: "
- + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory()+ "mb");
- this.lastResponseID = amResponse.getResponseId();
-
- this.availableResources = amResponse.getAvailableResources();
- this.allocatedContainers = amResponse.getAllocatedContainers();
+
+ this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
+ while (allocatedContainers.size() < numBSPTasks) {
+
+ AllocateRequest req = BuilderUtils.newAllocateRequest(
+ appAttemptId,
+ lastResponseID,
+ 0.0f,
+ createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+ taskMemoryInMb, priority), releasedContainers);
+
+ AllocateResponse allocateResponse = resourceManager.allocate(req);
+ AMResponse amResponse = allocateResponse.getAMResponse();
+ LOG.info("Got response! ID: " + amResponse.getResponseId()
+ + " with num of containers: "
+ + amResponse.getAllocatedContainers().size()
+ + " and following resources: "
+ + amResponse.getAvailableResources().getMemory() + "mb");
+ this.lastResponseID = amResponse.getResponseId();
+
+ this.availableResources = amResponse.getAvailableResources();
+ this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
+ LOG.info("Waiting to allocate "
+ + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+ Thread.sleep(1000l);
+ }
+
+ LOG.info("Got " + allocatedContainers.size() + " containers!");
int launchedBSPTasks = 0;
@@ -162,8 +179,15 @@ public class JobImpl implements Job {
+ allocatedContainer.getState() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
- BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(jobId, id,
- allocatedContainer, conf, yarnRPC, jobFile);
+ // Connect to ContainerManager on the allocated container
+ String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort();
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+ ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
+ ContainerManager.class, cmAddress, conf);
+
+ BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
+ allocatedContainer, cm, conf, jobFile, jobId);
launchers.put(id, runnableLaunchContainer);
completionService.submit(runnableLaunchContainer);
id++;
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Tue Oct 11 21:42:55 2011
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -103,6 +105,12 @@ public class YARNBSPJob extends BSPJob {
fs = FileSystem.get(getConf());
}
+ if (conf.get("bsp.user.name") == null) {
+ String s = getUnixUserName();
+ conf.set("bsp.user.name", s);
+ LOG.info("Retrieved username: " + s);
+ }
+
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
@@ -249,6 +257,7 @@ public class YARNBSPJob extends BSPJob {
&& localReport.getFinalApplicationStatus() != FinalApplicationStatus.FAILED
&& localReport.getFinalApplicationStatus() != FinalApplicationStatus.KILLED
&& localReport.getFinalApplicationStatus() != FinalApplicationStatus.SUCCEEDED) {
+ LOG.info("currently in state: " + localReport.getFinalApplicationStatus());
if (verbose) {
long remoteSuperStep = client.getCurrentSuperStep().get();
if (clientSuperStep > remoteSuperStep) {
@@ -276,4 +285,42 @@ public class YARNBSPJob extends BSPJob {
}
+ /*
+ * THESE FOLLOWING METHODS WILL BE IMPLEMENTED IN BSPJOBCLIENT SOON.
+ */
+
+ static String getUnixUserName() throws IOException {
+ String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
+ if (result.length != 1) {
+ throw new IOException("Expect one token as the result of "
+ + Shell.USER_NAME_COMMAND + ": " + toString(result));
+ }
+ return result[0];
+ }
+
+ private static String toString(String[] strArray) {
+ if (strArray == null || strArray.length == 0) {
+ return "";
+ }
+ StringBuilder buf = new StringBuilder(strArray[0]);
+ for (int i = 1; i < strArray.length; i++) {
+ buf.append(' ');
+ buf.append(strArray[i]);
+ }
+ return buf.toString();
+ }
+
+ private static String[] executeShellCommand(String[] command)
+ throws IOException {
+ String groups = Shell.execCommand(command);
+ StringTokenizer tokenizer = new StringTokenizer(groups);
+ int numOfTokens = tokenizer.countTokens();
+ String[] tokens = new String[numOfTokens];
+ for (int i = 0; tokenizer.hasMoreTokens(); i++) {
+ tokens[i] = tokenizer.nextToken();
+ }
+
+ return tokens;
+ }
+
}
Added: incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties?rev=1182103&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties (added)
+++ incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties Tue Oct 11 21:42:55 2011
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG
Propchange: incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native