You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/05/23 20:18:06 UTC
git commit: TEZ-32. Fix logging support for MapReduce tasks and AM.
(sseth)
Updated Branches:
refs/heads/TEZ-1 36198a636 -> 268f3ac97
TEZ-32. Fix logging support for MapReduce tasks and AM. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/268f3ac9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/268f3ac9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/268f3ac9
Branch: refs/heads/TEZ-1
Commit: 268f3ac97b081294fcdc22f7258890f219285fca
Parents: 36198a6
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 23 11:17:43 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 23 11:17:43 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/api/TezConfiguration.java | 4 +-
.../apache/tez/dag/utils/TezEngineChildJVM.java | 1 -
.../java/org/apache/tez/mapreduce/YARNRunner.java | 113 ++++++++-------
3 files changed, 61 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4a407b3..3018ea1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -45,7 +45,9 @@ public class TezConfiguration extends Configuration {
// TODO Should not be required once all tokens are handled via AppSubmissionContext
public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
public static final String APPLICATION_TOKENS_FILE = "appTokens";
-
+ public static final String DAG_APPLICATION_MASTER_CLASS =
+ "org.apache.tez.dag.app.DAGAppMaster";
+
public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT =
TEZ_PREFIX + "task.listener.thread-count";
public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index 727ded5..bc92ed8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
import org.apache.tez.dag.records.TezVertexID;
public class TezEngineChildJVM {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/268f3ac9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 7582a5e..9ece649 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -45,8 +44,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskLog;
-import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
@@ -70,7 +67,6 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@@ -93,11 +89,10 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.Edge;
@@ -120,8 +115,6 @@ public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
private ResourceMgrDelegate resMgrDelegate;
private ClientCache clientCache;
private Configuration conf;
@@ -172,6 +165,7 @@ public class YARNRunner implements ClientProtocol {
}
}
+ @VisibleForTesting
@Private
/**
* Used for testing mostly.
@@ -210,12 +204,6 @@ public class YARNRunner implements ClientProtocol {
return resMgrDelegate.getClusterMetrics();
}
- @VisibleForTesting
- Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
- throws IOException, InterruptedException {
- throw new UnsupportedOperationException("No HistoryServer for Tez");
- }
-
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException, InterruptedException {
@@ -384,17 +372,6 @@ public class YARNRunner implements ClientProtocol {
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
- // FIXME is this really required?
- // Add stdout/stderr env
- environment.put(
- MRJobConfig.STDOUT_LOGFILE_ENV,
- getTaskLogFile(TaskLog.LogName.STDOUT)
- );
- environment.put(
- MRJobConfig.STDERR_LOGFILE_ENV,
- getTaskLogFile(TaskLog.LogName.STDERR)
- );
-
}
private static String getChildEnv(Configuration jobConf, boolean isMap) {
@@ -418,19 +395,7 @@ public class YARNRunner implements ClientProtocol {
}
}
- private static String getTaskLogFile(LogName filter) {
- return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
- + filter.toString();
- }
-
- private static void setupLog4jProperties(Configuration jobConf,
- boolean isMap,
- Vector<String> vargs,
- long logSize) {
- String logLevel = getChildLogLevel(jobConf, isMap);
- MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
- }
-
+
private void setupMapReduceEnv(Configuration jobConf,
Map<String, String> environment, boolean isMap) throws IOException {
@@ -474,13 +439,10 @@ public class YARNRunner implements ClientProtocol {
// FIXME: don't think this is also needed given we already set java
// properties.
// TODO Change this not to use JobConf.
- long logSize = TaskLog.getTaskLogLength(new JobConf(jobConf));
- Vector<String> logProps = new Vector<String>(4);
- setupLog4jProperties(jobConf, isMap, logProps, logSize);
- Iterator<String> it = logProps.iterator();
+ String log4jCmdLineProperties = getLog4jCmdLineProperties(jobConf, isMap);
StringBuffer buffer = new StringBuffer();
- while (it.hasNext()) {
- buffer.append(" " + it.next());
+ if (log4jCmdLineProperties != null && log4jCmdLineProperties != "") {
+ buffer.append(" " + log4jCmdLineProperties);
}
// FIXME supposedly required for streaming, should we remove it and let
@@ -492,7 +454,7 @@ public class YARNRunner implements ClientProtocol {
hadoopClientOpts = hadoopClientOpts + " ";
}
hadoopClientOpts = hadoopClientOpts + buffer.toString();
- // environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+ //environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// FIXME for this to work, we need YARN-561 and the task runtime changed
// to use YARN-561
@@ -730,7 +692,7 @@ public class YARNRunner implements ClientProtocol {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
- Resource capability = recordFactory.newRecordInstance(Resource.class);
+ Resource capability = Records.newRecord(Resource.class);
capability.setMemory(
conf.getInt(TezConfiguration.DAG_AM_RESOURCE_MEMORY_MB,
TezConfiguration.DEFAULT_DAG_AM_RESOURCE_MEMORY_MB));
@@ -757,9 +719,10 @@ public class YARNRunner implements ClientProtocol {
// LOG.error(" !!!!!!!!!");
// vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
- // FIXME set up logging related properties
// TODO -Dtez.root.logger??
- // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+ String amLogLevel = jobConf.get(MRJobConfig.MR_AM_LOG_LEVEL,
+ MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+ addLog4jSystemProperties(amLogLevel, vargs);
// FIXME admin command opts and user command opts for tez?
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
@@ -775,7 +738,7 @@ public class YARNRunner implements ClientProtocol {
MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
vargs.add(mrAppMasterUserOptions);
- vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+ vargs.add(TezConfiguration.DAG_APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -859,8 +822,9 @@ public class YARNRunner implements ClientProtocol {
environment, vargsFinal, null, securityTokens, acls);
// Set up the ApplicationSubmissionContext
- ApplicationSubmissionContext appContext =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+
appContext.setApplicationId(applicationId); // ApplicationId
appContext.setResource(capability); // resource
appContext.setQueue( // Queue name
@@ -975,7 +939,7 @@ public class YARNRunner implements ClientProtocol {
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ LocalResource rsrc = Records.newRecord(LocalResource.class);
FileStatus rsrcStat = fs.getFileStatus(p);
rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
@@ -1127,6 +1091,7 @@ public class YARNRunner implements ClientProtocol {
}
}
+ @SuppressWarnings("deprecation")
private String getMapJavaOpts(Configuration jobConf) {
// follows pattern from YARN MapReduceChildJVM.java
String adminOpts = "";
@@ -1142,9 +1107,11 @@ public class YARNRunner implements ClientProtocol {
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
- return adminOpts + " " + userOpts;
+ return adminOpts.trim() + " " + userOpts.trim() + " "
+ + getLog4jCmdLineProperties(jobConf, true);
}
+ @SuppressWarnings("deprecation")
private String getReduceJavaOpts(Configuration jobConf) {
// follows pattern from YARN MapReduceChildJVM.java
String adminOpts = "";
@@ -1159,6 +1126,42 @@ public class YARNRunner implements ClientProtocol {
jobConf.get(
JobConf.MAPRED_TASK_JAVA_OPTS,
JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
- return adminOpts + " " + userOpts;
+
+ return adminOpts.trim() + " " + userOpts.trim() + " "
+ + getLog4jCmdLineProperties(jobConf, false);
}
-}
+
+ private static String getLog4jCmdLineProperties(Configuration jobConf,
+ boolean isMap) {
+ Vector<String> logProps = new Vector<String>(4);
+ addLog4jSystemProperties(getChildLogLevel(jobConf, isMap), logProps);
+ StringBuilder sb = new StringBuilder();
+ for (String str : logProps) {
+ sb.append(str).append(" ");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Add the JVM system properties necessary to configure
+ * {@link ContainerLogAppender}.
+ *
+ * @param logLevel
+ * the desired log level (eg INFO/WARN/DEBUG)
+ * @param vargs
+ * the argument list to append to
+ */
+ private static void addLog4jSystemProperties(String logLevel,
+ List<String> vargs) {
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ // TODO Fix the remaining properties after YARN-720 is fixed.
+ // May need to introduce a log4j properties file for tez tasks.
+ vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ // Setting this to 0 to avoid log size restrictions.
+ // Should be enforced by YARN.
+ vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + 0);
+ vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
+ }
+
+}
\ No newline at end of file