You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/10/17 02:35:38 UTC
tez git commit: TEZ-2886. Ability to merge AM credentials with DAG
credentials. Contributed by Jason Lowe.
Repository: tez
Updated Branches:
refs/heads/master 25f0247ee -> d336ebdbc
TEZ-2886. Ability to merge AM credentials with DAG credentials. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d336ebdb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d336ebdb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d336ebdb
Branch: refs/heads/master
Commit: d336ebdbc4ea86dce81a44b16d36fa2846cc4401
Parents: 25f0247
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Oct 16 17:34:56 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Oct 16 17:34:56 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 10 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +
.../apache/tez/dag/app/TestDAGAppMaster.java | 212 +++++++++++++++++++
4 files changed, 230 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d90402..6f94f0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2886. Ability to merge AM credentials with DAG credentials.
TEZ-2896. Fix thread names used during Input/Output initialization.
TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
TEZ-2887. Tez build failure due to missing dependency in pom files.
@@ -215,6 +216,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES
+ TEZ-2886. Ability to merge AM credentials with DAG credentials.
TEZ-2896. Fix thread names used during Input/Output initialization.
TEZ-2866. Tez UI: Newly added columns wont be displayed by default in tables
TEZ-2885. Remove counter logs from AMWebController.
http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 12435ca..80b9860 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -149,6 +149,16 @@ public class TezConfiguration extends Configuration {
@ConfigurationProperty
public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path";
+ /**
+ * Boolean value. If true then Tez will add the ApplicationMaster credentials
+ * to all task credentials.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="boolean")
+ public static final String TEZ_AM_CREDENTIALS_MERGE = TEZ_AM_PREFIX
+ + "credentials-merge";
+ public static boolean TEZ_AM_CREDENTIALS_MERGE_DEFAULT = true;
+
@Private
@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_USE_CONCURRENT_DISPATCHER = TEZ_AM_PREFIX
http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index d774b9a..f4c57e3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -935,6 +935,12 @@ public class DAGAppMaster extends AbstractService {
} else {
dagCredentials = new Credentials();
}
+ if (getConfig().getBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE,
+ TezConfiguration.TEZ_AM_CREDENTIALS_MERGE_DEFAULT)) {
+ LOG.info("Merging AM credentials into DAG credentials");
+ dagCredentials.mergeAll(amCredentials);
+ }
+
// TODO Does this move to the client in case of work-preserving recovery.
TokenCache.setSessionToken(sessionToken, dagCredentials);
http://git-wip-us.apache.org/repos/asf/tez/blob/d336ebdb/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index fa5d87c..a81c964 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -15,11 +15,18 @@
package org.apache.tez.dag.app;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -28,15 +35,44 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezApiVersionInfo;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezDAGID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestDAGAppMaster {
@@ -47,6 +83,21 @@ public class TestDAGAppMaster {
private static final String CL_NAME = "CL";
private static final String TC_NAME = "TC";
private static final String CLASS_SUFFIX = "_CLASS";
+ private static final File TEST_DIR = new File(
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestDAGAppMaster.class.getSimpleName()).getAbsoluteFile();
+
+ @Before
+ public void setup() {
+ FileUtil.fullyDelete(TEST_DIR);
+ TEST_DIR.mkdir();
+ }
+
+ @After
+ public void teardown() {
+ FileUtil.fullyDelete(TEST_DIR);
+ }
@Test(timeout = 5000)
public void testPluginParsing() throws IOException {
@@ -297,4 +348,165 @@ public class TestDAGAppMaster {
}
+ @Test
+ public void testDagCredentialsWithoutMerge() throws Exception {
+ testDagCredentials(false);
+ }
+
+ @Test
+ public void testDagCredentialsWithMerge() throws Exception {
+ testDagCredentials(true);
+ }
+
+ private void testDagCredentials(boolean doMerge) throws IOException {
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, doMerge);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ // create some sample AM credentials
+ Credentials amCreds = new Credentials();
+ JobTokenSecretManager jtsm = new JobTokenSecretManager();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(
+ new Text(appId.toString()));
+ Token<JobTokenIdentifier> sessionToken =
+ new Token<JobTokenIdentifier>(identifier, jtsm);
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, amCreds);
+ TestTokenSecretManager ttsm = new TestTokenSecretManager();
+ Text tokenAlias1 = new Text("alias1");
+ Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("amtoken1")), ttsm);
+ amCreds.addToken(tokenAlias1, amToken1);
+ Text tokenAlias2 = new Text("alias2");
+ Token<TestTokenIdentifier> amToken2 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("amtoken2")), ttsm);
+ amCreds.addToken(tokenAlias2, amToken2);
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ FSDataOutputStream sessionJarsPBOutStream =
+ TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
+ TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+ DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
+ .writeDelimitedTo(sessionJarsPBOutStream);
+ sessionJarsPBOutStream.close();
+ DAGAppMaster am = new DAGAppMaster(attemptId,
+ ContainerId.newContainerId(attemptId, 1),
+ "127.0.0.1", 0, 0, new SystemClock(), 1, true,
+ TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
+ new String[] {TEST_DIR.toString()},
+ new TezApiVersionInfo().getVersion(), 1, amCreds,
+ "someuser", null);
+ am.init(conf);
+ am.start();
+
+ // create some sample DAG credentials
+ Credentials dagCreds = new Credentials();
+ Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("dagtoken1")), ttsm);
+ dagCreds.addToken(tokenAlias2, dagToken1);
+ Text tokenAlias3 = new Text("alias3");
+ Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>(
+ new TestTokenIdentifier(new Text("dagtoken2")), ttsm);
+ dagCreds.addToken(tokenAlias3, dagToken2);
+
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ DAGPlan dagPlan = DAGPlan.newBuilder()
+ .setName("somedag")
+ .setCredentialsBinary(
+ DagTypeConverters.convertCredentialsToProto(dagCreds))
+ .build();
+ DAGImpl dag = am.createDAG(dagPlan, dagId);
+ Credentials fetchedDagCreds = dag.getCredentials();
+ am.stop();
+
+ Token<? extends TokenIdentifier> fetchedToken1 =
+ fetchedDagCreds.getToken(tokenAlias1);
+ if (doMerge) {
+ assertNotNull("AM creds missing from DAG creds", fetchedToken1);
+ compareTestTokens(amToken1, fetchedDagCreds.getToken(tokenAlias1));
+ } else {
+ assertNull("AM creds leaked to DAG creds", fetchedToken1);
+ }
+ compareTestTokens(dagToken1, fetchedDagCreds.getToken(tokenAlias2));
+ compareTestTokens(dagToken2, fetchedDagCreds.getToken(tokenAlias3));
+ }
+
+ private static void compareTestTokens(
+ Token<? extends TokenIdentifier> expected,
+ Token<? extends TokenIdentifier> actual) throws IOException {
+ TestTokenIdentifier expectedId = getTestTokenIdentifier(expected);
+ TestTokenIdentifier actualId = getTestTokenIdentifier(actual);
+ assertEquals("Token id not preserved", expectedId.getTestId(),
+ actualId.getTestId());
+ }
+
+ private static TestTokenIdentifier getTestTokenIdentifier(
+ Token<? extends TokenIdentifier> token) throws IOException {
+ ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream in = new DataInputStream(buf);
+ TestTokenIdentifier tokenId = new TestTokenIdentifier();
+ tokenId.readFields(in);
+ in.close();
+ return tokenId;
+ }
+
+ private static class TestTokenIdentifier extends TokenIdentifier {
+ private static Text KIND_NAME = new Text("test-token");
+
+ private Text testId;
+
+ public TestTokenIdentifier() {
+ this(new Text());
+ }
+
+ public TestTokenIdentifier(Text id) {
+ testId = id;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ testId.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ testId.write(out);
+ }
+
+ @Override
+ public Text getKind() {
+ return KIND_NAME;
+ }
+
+ @Override
+ public UserGroupInformation getUser() {
+ return UserGroupInformation.createRemoteUser("token-user");
+ }
+
+ public Text getTestId() {
+ return testId;
+ }
+ }
+
+ private static class TestTokenSecretManager extends
+ SecretManager<TestTokenIdentifier> {
+ @Override
+ public byte[] createPassword(TestTokenIdentifier id) {
+ return id.getBytes();
+ }
+
+ @Override
+ public byte[] retrievePassword(TestTokenIdentifier id)
+ throws InvalidToken {
+ return id.getBytes();
+ }
+
+ @Override
+ public TestTokenIdentifier createIdentifier() {
+ return new TestTokenIdentifier();
+ }
+ }
}