You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/02/21 02:19:17 UTC
tez git commit: TEZ-2130. Send the sessionToken as part of the AM
CLC. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 49f76ad90 -> c9a74d77b
TEZ-2130. Send the sessionToken as part of the AM CLC. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c9a74d77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c9a74d77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c9a74d77
Branch: refs/heads/master
Commit: c9a74d77b2d97a30f76fc498c243eca0a603077b
Parents: 49f76ad
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 17:19:03 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Feb 20 17:19:03 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/TezClientUtils.java | 10 +++-
.../org/apache/tez/common/TezCommonUtils.java | 20 +++++++
.../apache/tez/client/TestTezClientUtils.java | 59 ++++++++++++++++++--
.../app/rm/container/AMContainerHelpers.java | 26 +--------
5 files changed, 87 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 77b0985..c2d5e75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2130. Send the sessionToken as part of the AM CLC.
TEZ-1935. Organization should be removed from http://tez.apache.org/team-list.html.
TEZ-2009. Change license/copyright headers to 2015.
TEZ-2085. PipelinedSorter should bail out (on BufferOverflowException) instead of retrying continuously.
http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 597789c..69bc08e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -621,10 +622,17 @@ public class TezClientUtils {
}
}
+ // Send the shuffle token as part of the AM launch context, so that the NM running the AM can
+ // provide this to AuxServices running on the AM node - in case tasks run within the AM,
+ // and no other task runs on this node.
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
+ TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(amLaunchCredentials)));
+
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(amLocalResources, environment,
- vargsFinal, null, securityTokens, acls);
+ vargsFinal, serviceData, securityTokens, acls);
// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext = Records
http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index fe570a5..05e868c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -439,4 +439,24 @@ public class TezCommonUtils {
return str.split("\\s*,\\s*");
}
+
+ /**
+ * A helper function to serialize the JobTokenIdentifier to be sent to the
+ * ShuffleHandler as ServiceData.
+ *
+ * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly
+ * to avoid a dependency on mapreduce.
+ *
+ * @param jobToken
+ * the job token to be used for authentication of shuffle data
+ * requests.
+ * @return the serialized version of the jobToken.
+ */
+ public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
+ throws IOException {
+ // TODO these bytes should be versioned
+ DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+ jobToken.write(jobToken_dob);
+ return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 468361b..ea73ab3 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -29,7 +29,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -43,16 +45,23 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -196,20 +205,59 @@ public class TestTezClientUtils {
}
@Test(timeout = 5000)
+ public void testSessionTokenInAmClc() throws IOException, YarnException {
+
+ TezConfiguration tezConf = new TezConfiguration();
+
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ DAG dag = DAG.create("testdag");
+ dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
+ .setTaskLaunchCmdOpts("initialLaunchOpts"));
+
+ Credentials credentials = new Credentials();
+ JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+ TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
+ Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
+ assertNotNull(jobToken);
+
+ AMConfiguration amConf =
+ new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
+ ApplicationSubmissionContext appSubmissionContext =
+ TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
+ new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
+ mock(HistoryACLPolicyManager.class));
+
+ ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
+ Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
+ assertNotNull(amServiceData);
+ assertEquals(1, amServiceData.size());
+
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(amServiceData.values().iterator().next());
+ Token<JobTokenIdentifier> jtSent = new Token<JobTokenIdentifier>();
+ jtSent.readFields(dibb);
+
+ assertTrue(Arrays.equals(jobToken.getIdentifier(), jtSent.getIdentifier()));
+ }
+
+ @Test(timeout = 5000)
public void testAMLoggingOptsSimple() throws IOException, YarnException {
TezConfiguration tezConf = new TezConfiguration();
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN");
ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ Credentials credentials = new Credentials();
+ JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+ TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
- new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
+ new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
- new HashMap<String, LocalResource>(), new Credentials(), false, new TezApiVersionInfo(),
+ new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
mock(HistoryACLPolicyManager.class));
List<String> expectedCommands = new LinkedList<String>();
@@ -238,14 +286,17 @@ public class TestTezClientUtils {
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "WARN;org.apache.hadoop.ipc=DEBUG;org.apache.hadoop.security=DEBUG");
ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ Credentials credentials = new Credentials();
+ JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager();
+ TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials);
DAG dag = DAG.create("testdag");
dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1)
.setTaskLaunchCmdOpts("initialLaunchOpts"));
AMConfiguration amConf =
- new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), new Credentials());
+ new AMConfiguration(tezConf, new HashMap<String, LocalResource>(), credentials);
ApplicationSubmissionContext appSubmissionContext =
TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
- new HashMap<String, LocalResource>(), new Credentials(), false, new TezApiVersionInfo(),
+ new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
mock(HistoryACLPolicyManager.class));
List<String> expectedCommands = new LinkedList<String>();
http://git-wip-us.apache.org/repos/asf/tez/blob/c9a74d77/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index b776349..d1b2ea8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -46,7 +45,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -121,7 +120,7 @@ public class AMContainerHelpers {
// Add shuffle token
LOG.info("Putting shuffle token in serviceData");
serviceData.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
- serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
+ TezCommonUtils.serializeServiceData(TokenCache.getSessionToken(containerCredentials)));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
@@ -211,25 +210,4 @@ public class AMContainerHelpers {
return container;
}
-
- /**
- * A helper function to serialize the JobTokenIdentifier to be sent to the
- * ShuffleHandler as ServiceData.
- *
- * *NOTE* This is a copy of what is done by the MapReduce ShuffleHandler. Not using that directly
- * to avoid a dependency on mapreduce.
- *
- * @param jobToken
- * the job token to be used for authentication of shuffle data
- * requests.
- * @return the serialized version of the jobToken.
- */
- private static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
- throws IOException {
- // TODO these bytes should be versioned
- DataOutputBuffer jobToken_dob = new DataOutputBuffer();
- jobToken.write(jobToken_dob);
- return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
- }
-
}