You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/26 01:00:52 UTC

git commit: TEZ-192. Read tokens from UGI rather than a tokens file. (sseth)

Updated Branches:
  refs/heads/master 6d8646c32 -> 1c44f634c


TEZ-192. Read tokens from UGI rather than a tokens file. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1c44f634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1c44f634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1c44f634

Branch: refs/heads/master
Commit: 1c44f634c2820ee5ced891a83654060273ab4a0d
Parents: 6d8646c
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Oct 25 16:00:06 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Oct 25 16:00:06 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  7 --
 .../tez/dag/app/ContainerHeartbeatHandler.java  |  4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 67 ++++++--------------
 .../tez/dag/app/HeartbeatHandlerBase.java       |  6 +-
 .../tez/dag/app/TaskHeartbeatHandler.java       |  4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 27 +++++---
 .../tez/mapreduce/hadoop/DeprecatedKeys.java    |  5 --
 .../tez/mapreduce/hadoop/MRJobConfig.java       |  6 --
 .../org/apache/tez/mapreduce/YARNRunner.java    | 14 +---
 9 files changed, 48 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8d3832e..04b96ba 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -43,9 +43,6 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
   public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";
 
-  // TODO Should not be required once all tokens are handled via AppSubmissionContext
-  public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
-  public static final String APPLICATION_TOKENS_FILE = "appTokens";
   public static final String TEZ_APPLICATION_MASTER_CLASS =
       "org.apache.tez.dag.app.DAGAppMaster";
 
@@ -65,10 +62,6 @@ public class TezConfiguration extends Configuration {
       TEZ_AM_PREFIX + "task.listener.thread-count";
   public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
 
-  public static final String TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT =
-      TEZ_AM_PREFIX + "container.listener.thread-count";
-  public static final int TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30;
-
   // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
   // TODO Are any of these node blacklisting properties required. (other than for MR compat)
   public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
index 2e813ff..55ce7da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
@@ -29,8 +29,8 @@ public class ContainerHeartbeatHandler extends
 
  
   public ContainerHeartbeatHandler(AppContext context, 
-      int numThreads) {
-    super(context, numThreads, "ContainerHeartbeatHandler");
+      int expectedConcurrency) {
+    super(context, expectedConcurrency, "ContainerHeartbeatHandler");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4e4e82e..6d2ec19 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -46,7 +46,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -54,7 +53,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceOperations;
@@ -74,6 +72,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -248,8 +247,6 @@ public class DAGAppMaster extends AbstractService {
     this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
-    downloadTokensAndSetupUGI(conf);
-
     context = new RunningAppContext(conf);
 
     clientHandler = new DAGClientHandler();
@@ -480,7 +477,7 @@ public class DAGAppMaster extends AbstractService {
     
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
-    String dagIdString = dagId.toString().replace("application", "job");
+    String dagIdString = dagId.toString();
     jobTokenSecretManager.addTokenForJob(dagIdString, sessionToken);
     LOG.info("Adding job token for " + dagIdString
         + " to jobTokenSecretManager");
@@ -503,43 +500,7 @@ public class DAGAppMaster extends AbstractService {
 
     return newDag;
   } // end createDag()
-
-
-  /**
-   * Obtain the tokens needed by the job and put them in the UGI
-   * @param conf
-   */
-  protected void downloadTokensAndSetupUGI(Configuration conf) {
-    // TODO remove - TEZ-71
-    try {
-      this.currentUser = UserGroupInformation.getCurrentUser();
-
-      if (UserGroupInformation.isSecurityEnabled()) {
-        // Read the file-system tokens from the localized tokens-file.
-        Path jobSubmitDir =
-            FileContext.getLocalFSFileContext().makeQualified(
-                new Path(new File(TezConfiguration.JOB_SUBMIT_DIR)
-                    .getAbsolutePath()));
-        Path jobTokenFile =
-            new Path(jobSubmitDir, TezConfiguration.APPLICATION_TOKENS_FILE);
-        tokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
-        LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
-            + jobTokenFile);
-
-        for (Token<? extends TokenIdentifier> tk : tokens.getAllTokens()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Token of kind " + tk.getKind()
-                + "in current ugi in the AppMaster for service "
-                + tk.getService());
-          }
-          currentUser.addToken(tk); // For use by AppMaster itself.
-        }
-      }
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
+  
   protected void addIfService(Object object, boolean addDispatcher) {
     if (object instanceof Service) {
       Service service = (Service) object;
@@ -580,8 +541,8 @@ public class DAGAppMaster extends AbstractService {
       AppContext context, Configuration conf) {
     ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context,
         conf.getInt(
-            TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT,
-            TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
+            TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+            TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
     return chh;
   }
 
@@ -1449,12 +1410,26 @@ public class DAGAppMaster extends AbstractService {
   protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
       final Configuration conf, String jobUserName) throws IOException,
       InterruptedException {
-    Credentials credentials =
-        UserGroupInformation.getCurrentUser().getCredentials();
     UserGroupInformation.setConfiguration(conf);
+    appMaster.currentUser = UserGroupInformation.getCurrentUser();
+        Credentials credentials =
+        UserGroupInformation.getCurrentUser().getCredentials();
+    
     UserGroupInformation appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
     appMasterUgi.addCredentials(credentials);
+    
+    // Now remove the AM->RM token so tasks don't have it
+    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    
+    appMaster.tokens = credentials;
+
     appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
index 1abd934..3741150 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
@@ -44,15 +44,15 @@ public abstract class HeartbeatHandlerBase<T> extends AbstractService {
   private ConcurrentMap<T, ReportTime> runningMap;
   private volatile boolean stopped;
 
-  public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
+  public HeartbeatHandlerBase(AppContext appContext, int expectedConcurrency, String name) {
     super(name);
     this.name = name;
     this.eventHandler = appContext.getEventHandler();
     this.clock = appContext.getClock();
     this.appContext = appContext;
-    numThreads = numThreads == 0 ? 1 : numThreads;
+    expectedConcurrency = expectedConcurrency == 0 ? 1 : expectedConcurrency;
     this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
-        16, 0.75f, numThreads);
+        16, 0.75f, expectedConcurrency);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 9f576cc..c41244d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -35,8 +35,8 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 @SuppressWarnings({"unchecked"})
 public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID> {
 
-  public TaskHeartbeatHandler(AppContext context, int numThreads) {
-    super(context, numThreads, "TaskHeartbeatHandler");
+  public TaskHeartbeatHandler(AppContext context, int expectedConcurrency) {
+    super(context, expectedConcurrency, "TaskHeartbeatHandler");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 853bacf..9b33557 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -863,6 +863,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     writeLock.lock();
     try {
       this.vertexLocationHint = vertexLocationHint;
+      if (LOG.isDebugEnabled()) {
+        logLocationHints(this.vertexLocationHint);
+      }
     } finally {
       writeLock.unlock();
     }
@@ -1984,15 +1987,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     for (TaskLocationHint taskLocationHint : locationHint
         .getTaskLocationHints()) {
       StringBuilder sb = new StringBuilder();
-      sb.append("Hosts: ");
-      for (String host : taskLocationHint.getDataLocalHosts()) {
-        hosts.add(host);
-        sb.append(host).append(", ");
+      if (taskLocationHint.getDataLocalHosts() == null) {
+        sb.append("No Hosts");
+      } else {
+        sb.append("Hosts: ");
+        for (String host : taskLocationHint.getDataLocalHosts()) {
+          hosts.add(host);
+          sb.append(host).append(", ");
+        }
       }
-      sb.append("Racks: ");
-      for (String rack : taskLocationHint.getRacks()) {
-        racks.add(rack);
-        sb.append(rack).append(", ");
+      if (taskLocationHint.getRacks() == null) {
+        sb.append("No Racks");
+      } else {
+        sb.append("Racks: ");
+        for (String rack : taskLocationHint.getRacks()) {
+          racks.add(rack);
+          sb.append(rack).append(", ");
+        }
       }
       LOG.debug("Location: " + counter + " : " + sb.toString());
       counter++;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index dac92ed..f0cb289 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -107,11 +107,6 @@ public class DeprecatedKeys {
   }
   
   private static void populateMRToDagParamMap() {
-    mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR,
-        TezConfiguration.JOB_SUBMIT_DIR);
-    mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE,
-        TezConfiguration.APPLICATION_TOKENS_FILE);
-
     // TODO Default value handling.
     mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
         TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 65eca5f..940b1e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -572,12 +572,6 @@ public interface MRJobConfig {
   public static final String APPLICATION_MASTER_CLASS =
       "org.apache.tez.dag.app.DAGAppMaster";
 
-  // The token file for the application. Should contain tokens for access to
-  // remote file system and may optionally contain application specific tokens.
-  // For now, generated by the AppManagers and used by NodeManagers and the
-  // Containers.
-  public static final String APPLICATION_TOKENS_FILE = "appTokens";
-  
   /** The log directory for the containers */
   public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 87df0b0..96f6936 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -90,7 +90,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -310,8 +309,7 @@ public class YARNRunner implements ClientProtocol {
     // TODO gross hack
     for (String s : new String[] {
         MRJobConfig.JOB_SPLIT,
-        MRJobConfig.JOB_SPLIT_METAINFO,
-        MRJobConfig.APPLICATION_TOKENS_FILE }) {
+        MRJobConfig.JOB_SPLIT_METAINFO}) {
       localResources.put(s,
           createApplicationResource(defaultFileContext,
               new Path(jobSubmitDir, s), LocalResourceType.FILE));
@@ -507,16 +505,6 @@ public class YARNRunner implements ClientProtocol {
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   throws IOException, InterruptedException {
 
-    // TEZ-192 - stop using token file
-    // Upload only in security mode: TODO
-    Path applicationTokensFile =
-        new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
-    try {
-      ts.writeTokenStorageFile(applicationTokensFile, conf);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
-
     ApplicationId appId = resMgrDelegate.getApplicationId();
 
     FileSystem fs = FileSystem.get(conf);