You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:32:21 UTC

svn commit: r1470658 - in /incubator/tez/branches/TEZ-1: tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/hadoop/mapred/ tez-dag/src/main/java/org/apache/tez/dag/app/ tez-dag/src/main/java/org/apache/tez/dag/app/rm/con...

Author: sseth
Date: Mon Apr 22 18:32:20 2013
New Revision: 1470658

URL: http://svn.apache.org/r1470658
Log:
TEZ-58. ContainerLaunchContext construction should not require MR config. (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
Removed:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM2.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java Mon Apr 22 18:32:20 2013
@@ -60,6 +60,9 @@ public class TezConfiguration extends Co
     TEZ_HOME_ENV + "/lib/*"
   };
 
+  public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+  
+  
   public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
 
 }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Mon Apr 22 18:32:20 2013
@@ -67,6 +67,7 @@ import org.apache.log4j.LogManager;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.runtime.TezEngineFactory;
@@ -290,7 +291,7 @@ public class YarnTezDagChild {
     JobConf job = task.getConf();
     
     String appAttemptIdEnv = System
-        .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+        .getenv(TezConfiguration.APPLICATION_ATTEMPT_ID_ENV);
     LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
     // Set it in conf, so as to be able to be used the the OutputCommitter.
     job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Mon Apr 22 18:32:20 2013
@@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -1305,8 +1304,6 @@ public class DAGAppMaster extends Compos
       dagConf.setBoolean("fs.automatic.close", false);
 
       dagConf.set(DAGConfiguration.USER_NAME, jobUserName);
-      // TODO Remove after fixing TaskLanch JVM construction
-      dagConf.set(MRJobConfig.USER_NAME, jobUserName);
 
       initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
           jobUserName);

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Mon Apr 22 18:32:20 2013
@@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceChildJVM2;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -50,11 +47,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.utils.TezEngineChildJVM;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -138,7 +138,7 @@ public class AMContainerHelpers {
     // The null fields are per-container and will be constructed for each
     // container separately.
     ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
-        conf.get(MRJobConfig.USER_NAME), localResources,
+        conf.get(DAGConfiguration.USER_NAME), localResources,
         environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
 
     return container;
@@ -147,17 +147,17 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> acls,
-      ContainerId containerId, JobConf jobConf, TezVertexID vertexId,
+      ContainerId containerId, Configuration conf, TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
       Resource assignedCapability, Map<String, LocalResource> localResources,
       Map<String, String> vertexEnv,
       TaskAttemptListener taskAttemptListener, Credentials credentials,
-      boolean shouldProfile) {
+      boolean shouldProfile, AppContext appContext) {
 
     synchronized (commonContainerSpecLock) {
       if (commonContainerSpec == null) {
         commonContainerSpec = createCommonContainerLaunchContext(
-            acls, jobConf, jobToken, vertexId, credentials);
+            acls, conf, jobToken, vertexId, credentials);
       }
     }
 
@@ -174,11 +174,11 @@ public class AMContainerHelpers {
     myEnv.putAll(env);
     myEnv.putAll(vertexEnv);
     // TODO TEZ-38 MRChildJVM2.setEnv should become a no-op
-    MapReduceChildJVM2.setVMEnv(myEnv, jobConf, vertexId);
+    TezEngineChildJVM.setVMEnv(myEnv, conf, vertexId, appContext);
 
     // Set up the launch command
-    List<String> commands = MapReduceChildJVM2.getVMCommand(
-        taskAttemptListener.getAddress(), jobConf, vertexId, containerId,
+    List<String> commands = TezEngineChildJVM.getVMCommand(
+        taskAttemptListener.getAddress(), conf, vertexId, containerId,
         vertexId.getDAGId().getApplicationId(), shouldProfile);
 
     // Duplicate the ByteBuffers for access by multiple containers.

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1470658&r1=1470657&r2=1470658&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Mon Apr 22 18:32:20 2013
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -319,19 +318,17 @@ public class AMContainerImpl implements 
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventLaunchRequest event = (AMContainerEventLaunchRequest) cEvent;
-      
-      JobConf jobConf = new JobConf(event.getConf());
-      
+
       container.clc = AMContainerHelpers.createContainerLaunchContext(
           container.appContext.getApplicationACLs(),
-          container.getContainerId(), jobConf,
+          container.getContainerId(), event.getConf(),
           event.getVertexId(),
           event.getJobToken(),
           container.getContainer().getResource(),
           event.getLocalResources(),
           event.getEnvironment(),
           container.taskAttemptListener, event.getCredentials(),
-          event.shouldProfile());
+          event.shouldProfile(), container.appContext);
 
       container.registerWithTAListener();
       container.sendStartRequestToNM();

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java?rev=1470658&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java Mon Apr 22 18:32:20 2013
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.utils;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.YarnTezDagChild;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.engine.records.TezVertexID;
+
+public class TezEngineChildJVM {
+
+  // FIXME 
+  public static enum LogName {
+    /** Log on the stdout of the task. */
+    STDOUT ("stdout"),
+
+    /** Log on the stderr of the task. */
+    STDERR ("stderr"),
+    
+    /** Log on the map-reduce system logs of the task. */
+    SYSLOG ("syslog"),
+    
+    /** The java profiler information. */
+    PROFILE ("profile.out"),
+    
+    /** Log the debug script's stdout  */
+    DEBUGOUT ("debugout");
+        
+    private String prefix;
+    
+    private LogName(String prefix) {
+      this.prefix = prefix;
+    }
+    
+    @Override
+    public String toString() {
+      return prefix;
+    }
+  }
+  
+  private static String getTaskLogFile(LogName filter) {
+    return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + 
+        filter.toString();
+  }
+  
+  public static void setVMEnv(Map<String, String> environment, Configuration conf,
+      TezVertexID vertexId, AppContext appContext) {
+
+    // FIXME this should be derivable from the container id set by the NM
+    // and not require the AM to set
+    environment.put(TezConfiguration.APPLICATION_ATTEMPT_ID_ENV,
+        String.valueOf(appContext.getApplicationAttemptId().getAttemptId()));
+  }
+
+  public static List<String> getVMCommand(
+      InetSocketAddress taskAttemptListenerAddr, Configuration conf, 
+      TezVertexID vertexId, 
+      ContainerId containerId, ApplicationId jobID, boolean shouldProfile) {
+
+    Vector<String> vargs = new Vector<String>(9);
+
+    vargs.add("exec");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    // Add child (task) java-vm options.
+    // FIXME add support for child java opts
+
+    Path childTmpDir = new Path(Environment.PWD.$(),
+        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+    // FIXME Setup the log4j properties
+
+    // Decision to profile needs to be made in the scheduler.
+    if (shouldProfile) {
+      // FIXME add support for profiling
+    }
+
+    // Add main class and its arguments 
+    vargs.add(YarnTezDagChild.class.getName());  // main of Child
+    // pass TaskAttemptListener's address
+    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+    // Set the job id
+    vargs.add(jobID.toString());
+
+    // Finally add the containerId.
+    vargs.add(String.valueOf(containerId.toString()));
+    vargs.add("1>" + getTaskLogFile(LogName.STDOUT));
+    vargs.add("2>" + getTaskLogFile(LogName.STDERR));
+
+    // Final commmand
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    Vector<String> vargsFinal = new Vector<String>(1);
+    vargsFinal.add(mergedCommand.toString());
+    return vargsFinal;
+  }
+
+}