You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:32:21 UTC
svn commit: r1470658 - in /incubator/tez/branches/TEZ-1:
tez-dag-api/src/main/java/org/apache/tez/dag/api/
tez-dag/src/main/java/org/apache/hadoop/mapred/
tez-dag/src/main/java/org/apache/tez/dag/app/
tez-dag/src/main/java/org/apache/tez/dag/app/rm/con...
Author: sseth
Date: Mon Apr 22 18:32:20 2013
New Revision: 1470658
URL: http://svn.apache.org/r1470658
Log:
TEZ-58. ContainerLaunchContext construction should not require MR config. (sseth)
Added:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
Removed:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
Modified:
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java Mon Apr 22 18:32:20 2013
@@ -60,6 +60,9 @@ public class TezConfiguration extends Co
TEZ_HOME_ENV + "/lib/*"
};
+ public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
+
public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Mon Apr 22 18:32:20 2013
@@ -67,6 +67,7 @@ import org.apache.log4j.LogManager;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.records.TezTaskAttemptID;
import org.apache.tez.engine.runtime.TezEngineFactory;
@@ -290,7 +291,7 @@ public class YarnTezDagChild {
JobConf job = task.getConf();
String appAttemptIdEnv = System
- .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+ .getenv(TezConfiguration.APPLICATION_ATTEMPT_ID_ENV);
LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
// Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Mon Apr 22 18:32:20 2013
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -1305,8 +1304,6 @@ public class DAGAppMaster extends Compos
dagConf.setBoolean("fs.automatic.close", false);
dagConf.set(DAGConfiguration.USER_NAME, jobUserName);
- // TODO Remove after fixing TaskLanch JVM construction
- dagConf.set(MRJobConfig.USER_NAME, jobUserName);
initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
jobUserName);
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Mon Apr 22 18:32:20 2013
@@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -50,11 +47,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.utils.TezEngineChildJVM;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import com.google.common.annotations.VisibleForTesting;
@@ -138,7 +138,7 @@ public class AMContainerHelpers {
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
- conf.get(MRJobConfig.USER_NAME), localResources,
+ conf.get(DAGConfiguration.USER_NAME), localResources,
environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
return container;
@@ -147,17 +147,17 @@ public class AMContainerHelpers {
@VisibleForTesting
public static ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> acls,
- ContainerId containerId, JobConf jobConf, TezVertexID vertexId,
+ ContainerId containerId, Configuration conf, TezVertexID vertexId,
Token<JobTokenIdentifier> jobToken,
Resource assignedCapability, Map<String, LocalResource> localResources,
Map<String, String> vertexEnv,
TaskAttemptListener taskAttemptListener, Credentials credentials,
- boolean shouldProfile) {
+ boolean shouldProfile, AppContext appContext) {
synchronized (commonContainerSpecLock) {
if (commonContainerSpec == null) {
commonContainerSpec = createCommonContainerLaunchContext(
- acls, jobConf, jobToken, vertexId, credentials);
+ acls, conf, jobToken, vertexId, credentials);
}
}
@@ -174,11 +174,11 @@ public class AMContainerHelpers {
myEnv.putAll(env);
myEnv.putAll(vertexEnv);
// TODO TEZ-38 MRChildJVM2.setEnv should become a no-op
- MapReduceChildJVM2.setVMEnv(myEnv, jobConf, vertexId);
+ TezEngineChildJVM.setVMEnv(myEnv, conf, vertexId, appContext);
// Set up the launch command
- List<String> commands = MapReduceChildJVM2.getVMCommand(
- taskAttemptListener.getAddress(), jobConf, vertexId, containerId,
+ List<String> commands = TezEngineChildJVM.getVMCommand(
+ taskAttemptListener.getAddress(), conf, vertexId, containerId,
vertexId.getDAGId().getApplicationId(), shouldProfile);
// Duplicate the ByteBuffers for access by multiple containers.
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Mon Apr 22 18:32:20 2013
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -319,19 +318,17 @@ public class AMContainerImpl implements
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
-
- JobConf jobConf = new JobConf(event.getConf());
-
+
container.clc = AMContainerHelpers.createContainerLaunchContext(
container.appContext.getApplicationACLs(),
- container.getContainerId(), jobConf,
+ container.getContainerId(), event.getConf(),
event.getVertexId(),
event.getJobToken(),
container.getContainer().getResource(),
event.getLocalResources(),
event.getEnvironment(),
container.taskAttemptListener, event.getCredentials(),
- event.shouldProfile());
+ event.shouldProfile(), container.appContext);
container.registerWithTAListener();
container.sendStartRequestToNM();
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java?rev=1470658&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java Mon Apr 22 18:32:20 2013
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.utils;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.YarnTezDagChild;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class TezEngineChildJVM {
+
+ // FIXME
+ public static enum LogName {
+ /** Log on the stdout of the task. */
+ STDOUT ("stdout"),
+
+ /** Log on the stderr of the task. */
+ STDERR ("stderr"),
+
+ /** Log on the map-reduce system logs of the task. */
+ SYSLOG ("syslog"),
+
+ /** The java profiler information. */
+ PROFILE ("profile.out"),
+
+ /** Log the debug script's stdout */
+ DEBUGOUT ("debugout");
+
+ private String prefix;
+
+ private LogName(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return prefix;
+ }
+ }
+
+ private static String getTaskLogFile(LogName filter) {
+ return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
+ filter.toString();
+ }
+
+ public static void setVMEnv(Map<String, String> environment, Configuration conf,
+ TezVertexID vertexId, AppContext appContext) {
+
+ // FIXME this should be derivable from the container id set by the NM
+ // and not require the AM to set
+ environment.put(TezConfiguration.APPLICATION_ATTEMPT_ID_ENV,
+ String.valueOf(appContext.getApplicationAttemptId().getAttemptId()));
+ }
+
+ public static List<String> getVMCommand(
+ InetSocketAddress taskAttemptListenerAddr, Configuration conf,
+ TezVertexID vertexId,
+ ContainerId containerId, ApplicationId jobID, boolean shouldProfile) {
+
+ Vector<String> vargs = new Vector<String>(9);
+
+ vargs.add("exec");
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ // Add child (task) java-vm options.
+ // FIXME add support for child java opts
+
+ Path childTmpDir = new Path(Environment.PWD.$(),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+ vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+ // FIXME Setup the log4j properties
+
+ // Decision to profile needs to be made in the scheduler.
+ if (shouldProfile) {
+ // FIXME add support for profiling
+ }
+
+ // Add main class and its arguments
+ vargs.add(YarnTezDagChild.class.getName()); // main of Child
+ // pass TaskAttemptListener's address
+ vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
+ vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+ // Set the job id
+ vargs.add(jobID.toString());
+
+ // Finally add the containerId.
+ vargs.add(String.valueOf(containerId.toString()));
+ vargs.add("1>" + getTaskLogFile(LogName.STDOUT));
+ vargs.add("2>" + getTaskLogFile(LogName.STDERR));
+
+ // Final commmand
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ Vector<String> vargsFinal = new Vector<String>(1);
+ vargsFinal.add(mergedCommand.toString());
+ return vargsFinal;
+ }
+
+}