You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/11 23:42:55 UTC

svn commit: r1182103 - in /incubator/hama/branches/HamaV2/server/src/main: java/org/apache/hama/bsp/BSPTaskLauncher.java java/org/apache/hama/bsp/JobImpl.java java/org/apache/hama/bsp/YARNBSPJob.java resources/ resources/log4j.properties

Author: tjungblut
Date: Tue Oct 11 21:42:55 2011
New Revision: 1182103

URL: http://svn.apache.org/viewvc?rev=1182103&view=rev
Log:
Added log4j properties to log outputs and changed the way tasks gets submitted

Added:
    incubator/hama/branches/HamaV2/server/src/main/resources/
    incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties   (with props)
Modified:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Tue Oct 11 21:42:55 2011
@@ -17,7 +17,7 @@
  */
 package org.apache.hama.bsp;
 
-import java.net.InetSocketAddress;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.Callable;
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@@ -37,12 +36,12 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
@@ -52,30 +51,23 @@ public class BSPTaskLauncher implements 
   private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
 
   private final Container allocatedContainer;
+  private final int id;
   private final ContainerManager cm;
-  private final Path jobFile;
-  private final String user;
   private final Configuration conf;
-  private final int id;
+  private final String user;
+  private final Path jobFile;
   private final BSPJobID jobId;
 
-  public BSPTaskLauncher(BSPJobID jobId, int id, Container container,
-      Configuration conf, YarnRPC rpc, Path jobFile) throws YarnRemoteException {
+  public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+      Configuration conf, Path jobFile, BSPJobID jobId)
+      throws YarnRemoteException {
     this.id = id;
-    this.jobFile = jobFile;
-    this.user = conf.get("user.name");
+    this.cm = cm;
     this.conf = conf;
-    this.jobId = jobId;
     this.allocatedContainer = container;
-    // Connect to ContainerManager on the allocated container
-    String cmIpPortStr = container.getNodeId().getHost() + ":"
-        + container.getNodeId().getPort();
-    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
-    cm = (ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress,
-        conf);
-    LOG.info("Spawned task with id: " + this.id
-        + " for allocated container id: "
-        + this.allocatedContainer.getId().toString());
+    this.jobFile = jobFile;
+    this.jobId = jobId;
+    this.user = conf.get("bsp.user.name");
   }
 
   @Override
@@ -92,6 +84,25 @@ public class BSPTaskLauncher implements 
 
   @Override
   public BSPTaskStatus call() throws Exception {
+    LOG.info("Spawned task with id: " + this.id
+        + " for allocated container id: "
+        + this.allocatedContainer.getId().toString());
+    final GetContainerStatusRequest statusRequest = setupContainer(allocatedContainer, cm, user, id);
+
+    ContainerStatus lastStatus;
+    while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
+        .getState() != ContainerState.COMPLETE) {
+      Thread.sleep(1000l);
+    }
+
+    return new BSPTaskStatus(id, lastStatus.getExitStatus());
+  }
+
+  private GetContainerStatusRequest setupContainer(
+      Container allocatedContainer, ContainerManager cm, String user, int id)
+      throws IOException {
+    LOG.info("Setting up a container for user " + user + " with id of " + id
+        + " and containerID of " + allocatedContainer.getId());
     // Now we setup a ContainerLaunchContext
     ContainerLaunchContext ctx = Records
         .newRecord(ContainerLaunchContext.class);
@@ -113,11 +124,12 @@ public class BSPTaskLauncher implements 
     packageResource.setType(LocalResourceType.ARCHIVE);
     packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
 
-    ctx.setCommands(Arrays.asList("${JAVA_HOME}" + "/bin/java -cp './package/*' ",
-        BSPTaskLauncher.class.getCanonicalName(), jobId.getJtIdentifier(), id
-            + "", this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(),
-        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
-            + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
+    ctx.setCommands(Arrays.asList("${JAVA_HOME}"
+        + "/bin/java -cp './package/*' ", BSPTaskLauncher.class
+        .getCanonicalName(), jobId.getJtIdentifier(), id + "", this.jobFile
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(), " 1>"
+        + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
+        + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
     ctx.setLocalResources(Collections.singletonMap("package", packageResource));
 
     StartContainerRequest startReq = Records
@@ -128,13 +140,7 @@ public class BSPTaskLauncher implements 
     GetContainerStatusRequest statusReq = Records
         .newRecord(GetContainerStatusRequest.class);
     statusReq.setContainerId(allocatedContainer.getId());
-
-    while (cm.getContainerStatus(statusReq).getStatus().getState() != ContainerState.COMPLETE) {
-      Thread.sleep(1000l);
-    }
-
-    return new BSPTaskStatus(id, cm.getContainerStatus(statusReq).getStatus()
-        .getExitStatus());
+    return statusReq;
   }
 
   public static class BSPTaskStatus {

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/JobImpl.java Tue Oct 11 21:42:55 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.bsp;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,7 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMResponse;
@@ -135,20 +138,34 @@ public class JobImpl implements Job {
   @Override
   public JobState startJob() throws YarnRemoteException, InterruptedException,
       ExecutionException {
-    
-    AllocateRequest req = BuilderUtils.newAllocateRequest(
-        appAttemptId, lastResponseID, 0.0f,createBSPTaskRequest(getTotalBSPTasks(),
-            taskMemoryInMb, priority), releasedContainers);
-
-    AllocateResponse allocateResponse = resourceManager.allocate(req);
-    AMResponse amResponse = allocateResponse.getAMResponse();
-    LOG.info("Got response! ID: " + amResponse.getResponseId()
-        + " with num of containers: "
-        + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory()+ "mb");
-    this.lastResponseID = amResponse.getResponseId();
-    
-    this.availableResources = amResponse.getAvailableResources();
-    this.allocatedContainers = amResponse.getAllocatedContainers();
+
+    this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
+    while (allocatedContainers.size() < numBSPTasks) {
+
+      AllocateRequest req = BuilderUtils.newAllocateRequest(
+          appAttemptId,
+          lastResponseID,
+          0.0f,
+          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+              taskMemoryInMb, priority), releasedContainers);
+
+      AllocateResponse allocateResponse = resourceManager.allocate(req);
+      AMResponse amResponse = allocateResponse.getAMResponse();
+      LOG.info("Got response! ID: " + amResponse.getResponseId()
+          + " with num of containers: "
+          + amResponse.getAllocatedContainers().size()
+          + " and following resources: "
+          + amResponse.getAvailableResources().getMemory() + "mb");
+      this.lastResponseID = amResponse.getResponseId();
+
+      this.availableResources = amResponse.getAvailableResources();
+      this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
+      LOG.info("Waiting to allocate "
+          + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+      Thread.sleep(1000l);
+    }
+
+    LOG.info("Got " + allocatedContainers.size() + " containers!");
 
     int launchedBSPTasks = 0;
 
@@ -162,8 +179,15 @@ public class JobImpl implements Job {
           + allocatedContainer.getState() + ", containerResourceMemory"
           + allocatedContainer.getResource().getMemory());
 
-      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(jobId, id,
-          allocatedContainer, conf, yarnRPC, jobFile);
+      // Connect to ContainerManager on the allocated container
+      String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
+          + allocatedContainer.getNodeId().getPort();
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+      ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
+          ContainerManager.class, cmAddress, conf);
+
+      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
+          allocatedContainer, cm, conf, jobFile, jobId);
       launchers.put(id, runnableLaunchContainer);
       completionService.submit(runnableLaunchContainer);
       id++;

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1182103&r1=1182102&r2=1182103&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Tue Oct 11 21:42:55 2011
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -103,6 +105,12 @@ public class YARNBSPJob extends BSPJob {
       fs = FileSystem.get(getConf());
     }
 
+    if (conf.get("bsp.user.name") == null) {
+      String s = getUnixUserName();
+      conf.set("bsp.user.name", s);
+      LOG.info("Retrieved username: " + s);
+    }
+
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
     InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
         YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
@@ -249,6 +257,7 @@ public class YARNBSPJob extends BSPJob {
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.FAILED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.KILLED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.SUCCEEDED) {
+      LOG.info("currently in state: " + localReport.getFinalApplicationStatus());
       if (verbose) {
         long remoteSuperStep = client.getCurrentSuperStep().get();
         if (clientSuperStep > remoteSuperStep) {
@@ -276,4 +285,42 @@ public class YARNBSPJob extends BSPJob {
 
   }
 
+  /*
+   * THESE FOLLOWING METHODS WILL BE IMPLEMENTED IN BSPJOBCLIENT SOON.
+   */
+
+  static String getUnixUserName() throws IOException {
+    String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
+    if (result.length != 1) {
+      throw new IOException("Expect one token as the result of "
+          + Shell.USER_NAME_COMMAND + ": " + toString(result));
+    }
+    return result[0];
+  }
+
+  private static String toString(String[] strArray) {
+    if (strArray == null || strArray.length == 0) {
+      return "";
+    }
+    StringBuilder buf = new StringBuilder(strArray[0]);
+    for (int i = 1; i < strArray.length; i++) {
+      buf.append(' ');
+      buf.append(strArray[i]);
+    }
+    return buf.toString();
+  }
+
+  private static String[] executeShellCommand(String[] command)
+      throws IOException {
+    String groups = Shell.execCommand(command);
+    StringTokenizer tokenizer = new StringTokenizer(groups);
+    int numOfTokens = tokenizer.countTokens();
+    String[] tokens = new String[numOfTokens];
+    for (int i = 0; tokenizer.hasMoreTokens(); i++) {
+      tokens[i] = tokenizer.nextToken();
+    }
+
+    return tokens;
+  }
+
 }

Added: incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties?rev=1182103&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties (added)
+++ incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties Tue Oct 11 21:42:55 2011
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Define some default values that can be overridden by system properties
+hama.root.logger=INFO,console
+hama.log.dir=.
+hama.log.file=hama.log
+
+# Define the root logger to the system property "hama.root.logger".
+log4j.rootLogger=${hama.root.logger}
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hama.log.dir}/${hama.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hama.tasklog.taskid=null
+hama.tasklog.noKeepSplits=4
+hama.tasklog.totalLogFileSize=100
+hama.tasklog.purgeLogSplits=true
+hama.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hama.bsp.TaskLogAppender
+log4j.appender.TLA.taskId=${hama.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hama.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+#log4j.logger.org.apache.hadoop.dfs=DEBUG
+#log4j.logger.org.apache.hama=DEBUG

Propchange: incubator/hama/branches/HamaV2/server/src/main/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native