You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/21 02:19:17 UTC

tez git commit: TEZ-2130. Send the sessionToken as part of the AM CLC. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master 49f76ad90 -> c9a74d77b


TEZ-2130. Send the sessionToken as part of the AM CLC. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9a74d77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9a74d77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9a74d77

Branch: refs/heads/master
Commit: c9a74d77b2d97a30f76fc498c243eca0a603077b
Parents: 49f76ad
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 17:19:03 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Feb 20 17:19:03 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/client/TezClientUtils.java   | 10 +++-
 .../org/apache/tez/common/TezCommonUtils.java   | 20 +++++++
 .../apache/tez/client/TestTezClientUtils.java   | 59 ++++++++++++++++++--
 .../app/rm/container/AMContainerHelpers.java    | 26 +--------
 5 files changed, 87 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77b0985..c2d5e75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2130. Send the sessionToken as part of the AM CLC.
   TEZ-1935. Organization should be removed from http://tez.apache.org/team-list.html.
   TEZ-2009. Change license/copyright headers to 2015.
   TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously.

http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 597789c..69bc08e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -621,10 +622,17 @@ public class TezClientUtils {
       }
     }
 
+    // Send the shuffle token as part of the AM launch context, so that the NM running the AM can
+    // provide this to AuxServices running on the AM node - in case tasks run within the AM,
+    // and no other task runs on this node.
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+        TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials)));
+
     // Setup ContainerLaunchContext for AM container
     ContainerLaunchContext amContainer =
         ContainerLaunchContext.newInstance(amLocalResources, environment,
-            vargsFinal, null, securityTokens, acls);
+            vargsFinal, serviceData, securityTokens, acls);
 
     // Set up the ApplicationSubmissionContext
     ApplicationSubmissionContext appContext = Records

http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index fe570a5..05e868c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -439,4 +439,24 @@ public class TezCommonUtils {
 
     return str.split("\\s*,\\s*");
   }
+
+  /**
+   * A helper function to serialize the JobTokenIdentifier to be sent to the
+   * ShuffleHandler as ServiceData.
+   *
+   * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly
+   * to avoid a dependency on mapreduce.
+   *
+   * @param jobToken
+   *          the job token to be used for authentication of shuffle data
+   *          requests.
+   * @return the serialized version of the jobToken.
+   */
+  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
+      throws IOException {
+    // TODO these bytes should be versioned
+    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+    jobToken.write(jobToken_dob);
+    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 468361b..ea73ab3 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -29,7 +29,9 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -43,16 +45,23 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -196,20 +205,59 @@ public class TestTezClientUtils {
   }
 
   @Test(timeout = 5000)
+  public void testSessionTokenInAmClc() throws IOException, YarnException {
+
+    TezConfiguration tezConf = new TezConfiguration();
+
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    DAG dag = DAG.create("testdag");
+    dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
+        .setTaskLaunchCmdOpts("initialLaunchOpts"));
+
+    Credentials credentials = new Credentials();
+    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
+    Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+    assertNotNull(jobToken);
+
+    AMConfiguration amConf =
+        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
+    ApplicationSubmissionContext appSubmissionContext =
+        TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
+            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
+            mock(HistoryACLPolicyManager.class));
+
+    ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
+    Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
+    assertNotNull(amServiceData);
+    assertEquals(1, amServiceData.size());
+
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(amServiceData.values().iterator().next());
+    Token<JobTokenIdentifier> jtSent = new Token<JobTokenIdentifier>();
+    jtSent.readFields(dibb);
+
+    assertTrue(Arrays.equals(jobToken.getIdentifier(), jtSent.getIdentifier()));
+  }
+
+  @Test(timeout = 5000)
   public void testAMLoggingOptsSimple() throws IOException, YarnException {
 
     TezConfiguration tezConf = new TezConfiguration();
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN");
 
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    Credentials credentials = new Credentials();
+    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
     DAG dag = DAG.create("testdag");
     dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
         .setTaskLaunchCmdOpts("initialLaunchOpts"));
     AMConfiguration amConf =
-        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
+        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
-            new HashMap<String, LocalResource>(), new Credentials(), false, new TezApiVersionInfo(),
+            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
             mock(HistoryACLPolicyManager.class));
 
     List<String> expectedCommands = new LinkedList<String>();
@@ -238,14 +286,17 @@ public class TestTezClientUtils {
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN;org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG");
 
     ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    Credentials credentials = new Credentials();
+    JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+    TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
     DAG dag = DAG.create("testdag");
     dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
         .setTaskLaunchCmdOpts("initialLaunchOpts"));
     AMConfiguration amConf =
-        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
+        new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
-            new HashMap<String, LocalResource>(), new Credentials(), false, new TezApiVersionInfo(),
+            new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
             mock(HistoryACLPolicyManager.class));
 
     List<String> expectedCommands = new LinkedList<String>();

http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index b776349..d1b2ea8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -46,7 +45,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -121,7 +120,7 @@ public class AMContainerHelpers {
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
       serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
-          serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
+          TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -211,25 +210,4 @@ public class AMContainerHelpers {
 
     return container;
   }
-  
-  /**
-   * A helper function to serialize the JobTokenIdentifier to be sent to the
-   * ShuffleHandler as ServiceData.
-   * 
-   * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly
-   * to avoid a dependency on mapreduce.
-   * 
-   * @param jobToken
-   *          the job token to be used for authentication of shuffle data
-   *          requests.
-   * @return the serialized version of the jobToken.
-   */
-  private static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
-      throws IOException {
-    // TODO these bytes should be versioned
-    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-    jobToken.write(jobToken_dob);
-    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
-  }
-
 }