You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/23 20:18:06 UTC

git commit: TEZ-32. Fix logging support for MapReduce tasks and AM. (sseth)

Updated Branches:
  refs/heads/TEZ-1 36198a636 -> 268f3ac97


TEZ-32. Fix logging support for MapReduce tasks and AM. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/268f3ac9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/268f3ac9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/268f3ac9

Branch: refs/heads/TEZ-1
Commit: 268f3ac97b081294fcdc22f7258890f219285fca
Parents: 36198a6
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 23 11:17:43 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 23 11:17:43 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/api/TezConfiguration.java   |    4 +-
 .../apache/tez/dag/utils/TezEngineChildJVM.java    |    1 -
 .../java/org/apache/tez/mapreduce/YARNRunner.java  |  113 ++++++++-------
 3 files changed, 61 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4a407b3..3018ea1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -45,7 +45,9 @@ public class TezConfiguration extends Configuration {
   // TODO Should not be required once all tokens are handled via AppSubmissionContext
   public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
   public static final String APPLICATION_TOKENS_FILE = "appTokens";
-  
+  public static final String DAG_APPLICATION_MASTER_CLASS = 
+      "org.apache.tez.dag.app.DAGAppMaster";
+
   public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = 
                                 TEZ_PREFIX + "task.listener.thread-count";
   public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index 727ded5..bc92ed8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
 import org.apache.tez.dag.records.TezVertexID;
 
 public class TezEngineChildJVM {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 7582a5e..9ece649 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -45,8 +44,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskLog;
-import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
@@ -70,7 +67,6 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
@@ -93,11 +89,10 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.Edge;
@@ -120,8 +115,6 @@ public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
 
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
   private ResourceMgrDelegate resMgrDelegate;
   private ClientCache clientCache;
   private Configuration conf;
@@ -172,6 +165,7 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
+  @VisibleForTesting
   @Private
   /**
    * Used for testing mostly.
@@ -210,12 +204,6 @@ public class YARNRunner implements ClientProtocol {
     return resMgrDelegate.getClusterMetrics();
   }
 
-  @VisibleForTesting
-  Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
-      throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("No HistoryServer for Tez");
-  }
-
   @Override
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException, InterruptedException {
@@ -384,17 +372,6 @@ public class YARNRunner implements ClientProtocol {
         MRJobConfig.MAPRED_ADMIN_USER_ENV,
         MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
 
-    // FIXME is this really required?
-    // Add stdout/stderr env
-    environment.put(
-        MRJobConfig.STDOUT_LOGFILE_ENV,
-        getTaskLogFile(TaskLog.LogName.STDOUT)
-        );
-    environment.put(
-        MRJobConfig.STDERR_LOGFILE_ENV,
-        getTaskLogFile(TaskLog.LogName.STDERR)
-        );
-
   }
 
   private static String getChildEnv(Configuration jobConf, boolean isMap) {
@@ -418,19 +395,7 @@ public class YARNRunner implements ClientProtocol {
     }
   }
 
-  private static String getTaskLogFile(LogName filter) {
-    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
-        + filter.toString();
-  }
-
-  private static void setupLog4jProperties(Configuration jobConf,
-      boolean isMap,
-      Vector<String> vargs,
-      long logSize) {
-    String logLevel = getChildLogLevel(jobConf, isMap);
-    MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
-  }
-
+  
   private void setupMapReduceEnv(Configuration jobConf,
       Map<String, String> environment, boolean isMap) throws IOException {
 
@@ -474,13 +439,10 @@ public class YARNRunner implements ClientProtocol {
     // FIXME: don't think this is also needed given we already set java
     // properties.
     // TODO Change this not to use JobConf.
-    long logSize = TaskLog.getTaskLogLength(new JobConf(jobConf));
-    Vector<String> logProps = new Vector<String>(4);
-    setupLog4jProperties(jobConf, isMap, logProps, logSize);
-    Iterator<String> it = logProps.iterator();
+    String log4jCmdLineProperties = getLog4jCmdLineProperties(jobConf, isMap);
     StringBuffer buffer = new StringBuffer();
-    while (it.hasNext()) {
-      buffer.append(" " + it.next());
+    if (log4jCmdLineProperties != null && log4jCmdLineProperties != "") {
+      buffer.append(" " + log4jCmdLineProperties);
     }
 
     // FIXME supposedly required for streaming, should we remove it and let
@@ -492,7 +454,7 @@ public class YARNRunner implements ClientProtocol {
       hadoopClientOpts = hadoopClientOpts + " ";
     }
     hadoopClientOpts = hadoopClientOpts + buffer.toString();
-    // environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+    //environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
 
     // FIXME for this to work, we need YARN-561 and the task runtime changed
     // to use YARN-561
@@ -730,7 +692,7 @@ public class YARNRunner implements ClientProtocol {
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
 
     // Setup resource requirements
-    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(
         conf.getInt(TezConfiguration.DAG_AM_RESOURCE_MEMORY_MB,
             TezConfiguration.DEFAULT_DAG_AM_RESOURCE_MEMORY_MB));
@@ -757,9 +719,10 @@ public class YARNRunner implements ClientProtocol {
 //     LOG.error(" !!!!!!!!!");
 //     vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
     
-    // FIXME set up logging related properties
     // TODO -Dtez.root.logger??
-    // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+    String amLogLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL,
+        MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+    addLog4jSystemProperties(amLogLevel, vargs);
 
     // FIXME admin command opts and user command opts for tez?
     String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
@@ -775,7 +738,7 @@ public class YARNRunner implements ClientProtocol {
         MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
     vargs.add(mrAppMasterUserOptions);
 
-    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+    vargs.add(TezConfiguration.DAG_APPLICATION_MASTER_CLASS);
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
         Path.SEPARATOR + ApplicationConstants.STDOUT);
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -859,8 +822,9 @@ public class YARNRunner implements ClientProtocol {
             environment, vargsFinal, null, securityTokens, acls);
 
     // Set up the ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext =
-        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    
     appContext.setApplicationId(applicationId);                // ApplicationId
     appContext.setResource(capability);                        // resource
     appContext.setQueue(                                       // Queue name
@@ -975,7 +939,7 @@ public class YARNRunner implements ClientProtocol {
 
   private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
       throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
     rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
         .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
@@ -1127,6 +1091,7 @@ public class YARNRunner implements ClientProtocol {
     }
   }
   
+  @SuppressWarnings("deprecation")
   private String getMapJavaOpts(Configuration jobConf) {
     // follows pattern from YARN MapReduceChildJVM.java
     String adminOpts = "";
@@ -1142,9 +1107,11 @@ public class YARNRunner implements ClientProtocol {
                     JobConf.MAPRED_TASK_JAVA_OPTS,
                     JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
 
-    return adminOpts + " " + userOpts;
+    return adminOpts.trim() + " " + userOpts.trim() + " "
+    + getLog4jCmdLineProperties(jobConf, true);
   }
   
+  @SuppressWarnings("deprecation")
   private String getReduceJavaOpts(Configuration jobConf) {
     // follows pattern from YARN MapReduceChildJVM.java 
     String adminOpts = "";
@@ -1159,6 +1126,42 @@ public class YARNRunner implements ClientProtocol {
                 jobConf.get(
                     JobConf.MAPRED_TASK_JAVA_OPTS,
                     JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
-    return adminOpts + " " + userOpts;
+    
+    return adminOpts.trim() + " " + userOpts.trim() + " "
+        + getLog4jCmdLineProperties(jobConf, false);
   }
-}
+
+  private static String getLog4jCmdLineProperties(Configuration jobConf,
+      boolean isMap) {
+    Vector<String> logProps = new Vector<String>(4);
+    addLog4jSystemProperties(getChildLogLevel(jobConf, isMap), logProps);
+    StringBuilder sb = new StringBuilder();
+    for (String str : logProps) {
+      sb.append(str).append(" ");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Add the JVM system properties necessary to configure
+   * {@link ContainerLogAppender}.
+   * 
+   * @param logLevel
+   *          the desired log level (eg INFO/WARN/DEBUG)
+   * @param vargs
+   *          the argument list to append to
+   */
+  private static void addLog4jSystemProperties(String logLevel,
+      List<String> vargs) {
+    vargs.add("-Dlog4j.configuration=container-log4j.properties");
+    // TODO Fix the remaining properties after YARN-720 is fixed.
+    // May need to introduce a log4j properties file for tez tasks.
+    vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
+        + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    // Setting this to 0 to avoid log size restrictions.
+    // Should be enforced by YARN.
+    vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + 0);
+    vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
+  }
+
+}
\ No newline at end of file