You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 20:21:17 UTC
tez git commit: TEZ-1879. Create local UGI instances for each task
and the AM, when running in LocalMode. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 2ff8c945a -> ea46f459c
TEZ-1879. Create local UGI instances for each task and the AM, when
running in LocalMode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ea46f459
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ea46f459
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ea46f459
Branch: refs/heads/master
Commit: ea46f459c0e88b0f9b0c714f8c6ac9d9a6f03c5e
Parents: 2ff8c94
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 16 11:20:57 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 16 11:20:57 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/TezCommonUtils.java | 14 +++++++
.../java/org/apache/tez/client/LocalClient.java | 24 +++++------
.../org/apache/tez/dag/app/DAGAppMaster.java | 43 +++++++++-----------
.../app/launcher/LocalContainerLauncher.java | 12 ++++--
.../apache/tez/dag/app/MockDAGAppMaster.java | 8 +++-
.../org/apache/tez/dag/app/MockLocalClient.java | 5 ++-
.../org/apache/tez/runtime/task/TezChild.java | 22 ++++++----
8 files changed, 76 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 418475e..af4c60e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode.
TEZ-1661. LocalTaskScheduler hangs when shutdown.
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1951. Fix general findbugs warnings in tez-dag.
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index 685c728..fe570a5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -396,6 +397,19 @@ public class TezCommonUtils {
return bb;
}
+ public static Credentials parseCredentialsBytes(byte[] credentialsBytes) throws IOException {
+ Credentials credentials = new Credentials();
+ DataInputBuffer dib = new DataInputBuffer();
+ try {
+ byte[] tokenBytes = credentialsBytes;
+ dib.reset(tokenBytes, tokenBytes.length);
+ credentials.readTokenStorageStream(dib);
+ return credentials;
+ } finally {
+ dib.close();
+ }
+ }
+
public static void logCredentials(Log log, Credentials credentials, String identifier) {
if (log.isDebugEnabled()) {
log.debug(getCredentialsInfo(credentials, identifier));
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 94b8474..f309b02 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -23,7 +23,6 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -36,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -46,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Logger;
-import org.apache.tez.common.EnvironmentUpdateUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -274,17 +271,15 @@ public class LocalClient extends FrameworkClient {
fs.mkdirs(logDir);
fs.mkdirs(localDir);
+ UserGroupInformation.setConfiguration(conf);
// Add session specific credentials to the AM credentials.
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
ByteBuffer tokens = appContext.getAMContainerSpec().getTokens();
+ Credentials amCredentials;
if (tokens != null) {
- Credentials amCredentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(tokens);
- amCredentials.readTokenStorageStream(dibb);
- tokens.rewind();
- currentUser.addCredentials(amCredentials);
+ amCredentials = TezCommonUtils.parseCredentialsBytes(tokens.array());
+ } else {
+ amCredentials = new Credentials();
}
// Construct, initialize, and start the DAGAppMaster
@@ -298,9 +293,10 @@ public class LocalClient extends FrameworkClient {
dagAppMaster =
createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath(),
- new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()});
+ new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
+ amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
clientHandler = new DAGClientHandler(dagAppMaster);
- DAGAppMaster.initAndStartAppMaster(dagAppMaster, currentUser.getShortUserName());
+ DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
} catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
@@ -323,10 +319,10 @@ public class LocalClient extends FrameworkClient {
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir,
- String[] localDirs, String[] logDirs) {
+ String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) {
return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
- versionInfo.getVersion(), 1);
+ versionInfo.getVersion(), 1, credentials, jobUserName);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6dddf6a..e1ab3b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -240,8 +240,8 @@ public class DAGAppMaster extends AbstractService {
private DAGClientHandler clientHandler;
private DAG currentDAG;
- private Credentials amTokens = new Credentials(); // Filled during init
- private UserGroupInformation appMasterUgi;
+ private final Credentials amCredentials;
+ private final UserGroupInformation appMasterUgi;
private AtomicBoolean sessionStopped = new AtomicBoolean(false);
private long sessionTimeoutInterval;
@@ -283,7 +283,8 @@ public class DAGAppMaster extends AbstractService {
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
- String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts) {
+ String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts,
+ Credentials credentials, String jobUserName) {
super(DAGAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@@ -302,6 +303,11 @@ public class DAGAppMaster extends AbstractService {
this.dagVersionInfo = new TezDagVersionInfo();
this.clientVersion = clientVersion;
this.maxAppAttempts = maxAppAttempts;
+ this.amCredentials = credentials;
+ this.appMasterUgi = UserGroupInformation
+ .createRemoteUser(jobUserName);
+ this.appMasterUgi.addCredentials(amCredentials);
+
// TODO Metrics
//this.metrics = DAGAppMetrics.create();
@@ -341,8 +347,6 @@ public class DAGAppMaster extends AbstractService {
}
if (isLocal) {
- UserGroupInformation.setConfiguration(conf);
- appMasterUgi = UserGroupInformation.getCurrentUser();
conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
@@ -369,7 +373,7 @@ public class DAGAppMaster extends AbstractService {
addIfService(containerHeartbeatHandler, true);
sessionToken =
- TokenCache.getSessionToken(amTokens);
+ TokenCache.getSessionToken(amCredentials);
if (sessionToken == null) {
throw new RuntimeException("Could not find session token in AM Credentials");
}
@@ -1838,6 +1842,12 @@ public class DAGAppMaster extends AbstractService {
CommandLine cliParser = new GnuParser().parse(opts, args);
+ // TODO Does this really need to be a YarnConfiguration ?
+ Configuration conf = new Configuration(new YarnConfiguration());
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), conf);
+ UserGroupInformation.setConfiguration(conf);
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
@@ -1846,11 +1856,11 @@ public class DAGAppMaster extends AbstractService {
System.getenv(Environment.PWD.name()),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
- clientVersion, maxAppAttempts);
+ clientVersion, maxAppAttempts, credentials, jobUserName);
ShutdownHookManager.get().addShutdownHook(
new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
- initAndStartAppMaster(appMaster, jobUserName);
+ initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
LOG.fatal("Error starting DAGAppMaster", t);
@@ -2004,29 +2014,18 @@ public class DAGAppMaster extends AbstractService {
sendEvent(startDagEvent);
}
- // TODO XXX Does this really need to be a YarnConfiguration ?
public static void initAndStartAppMaster(final DAGAppMaster appMaster,
- String jobUserName) throws IOException,
+ final Configuration conf) throws IOException,
InterruptedException {
- final Configuration conf = new Configuration(new YarnConfiguration());
- TezUtilsInternal.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
-
// Do not automatically close FileSystem objects so that in case of
// SIGTERM I have a chance to write out the job history. I'll be closing
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
Limits.setConfiguration(conf);
- UserGroupInformation.setConfiguration(conf);
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
- appMaster.appMasterUgi = UserGroupInformation
- .createRemoteUser(jobUserName);
- appMaster.appMasterUgi.addCredentials(credentials);
-
// Now remove the AM->RM token so tasks don't have it
- Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ Iterator<Token<?>> iter = appMaster.amCredentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
@@ -2034,8 +2033,6 @@ public class DAGAppMaster extends AbstractService {
}
}
- appMaster.amTokens = credentials;
-
appMaster.appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 0343828..2f29569 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -46,10 +46,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -203,7 +205,8 @@ public class LocalContainerLauncher extends AbstractService implements
tezChild =
createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
- (TezTaskUmbilicalProtocol) taskAttemptListener);
+ (TezTaskUmbilicalProtocol) taskAttemptListener,
+ TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
} catch (InterruptedException e) {
handleLaunchFailed(e, event.getContainerId());
return;
@@ -318,7 +321,8 @@ public class LocalContainerLauncher extends AbstractService implements
private TezChild createTezChild(Configuration defaultConf, ContainerId containerId,
String tokenIdentifier, int attemptNumber, String[] localDirs,
- TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol) throws
+ TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol,
+ Credentials credentials) throws
InterruptedException, TezException, IOException {
Map<String, String> containerEnv = new HashMap<String, String>();
containerEnv.putAll(localEnv);
@@ -326,7 +330,7 @@ public class LocalContainerLauncher extends AbstractService implements
TezChild tezChild =
TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
- attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext);
+ attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext, credentials);
tezChild.setUmbilical(tezTaskUmbilicalProtocol);
return tezChild;
}
@@ -342,4 +346,4 @@ public class LocalContainerLauncher extends AbstractService implements
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index aca572b..d34532b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -302,9 +304,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
boolean isSession, String workingDirectory, String[] localDirs, String[] logDirs,
- AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag) {
+ AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag,
+ Credentials credentials, String jobUserName) {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
- isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1);
+ isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1,
+ credentials, jobUserName);
containerLauncher = new MockContainerLauncher(launcherGoFlag);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
this.initFailFlag = initFailFlag;
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 4a2fa9b..3cb9d8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.Clock;
@@ -51,10 +52,10 @@ public class MockLocalClient extends LocalClient {
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String userDir,
- String[] localDirs, String[] logDirs) {
+ String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) {
mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
(mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, localDirs, logDirs,
- mockAppLauncherGoFlag, initFailFlag, startFailFlag);
+ mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, jobUserName);
return mockApp;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ea46f459/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index a71fc55..7a9b600 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -102,6 +102,7 @@ public class TezChild {
private final ExecutionContext ExecutionContext;
private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
private final Map<String, String> serviceProviderEnvMap;
+ private final Credentials credentials;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -115,7 +116,8 @@ public class TezChild {
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
Map<String, String> serviceProviderEnvMap,
ObjectRegistryImpl objectRegistry, String pid,
- ExecutionContext ExecutionContext)
+ ExecutionContext ExecutionContext,
+ Credentials credentials)
throws IOException, InterruptedException {
this.defaultConf = conf;
this.containerIdString = containerIdentifier;
@@ -125,6 +127,7 @@ public class TezChild {
this.workingDir = workingDir;
this.pid = pid;
this.ExecutionContext = ExecutionContext;
+ this.credentials = credentials;
getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -146,8 +149,7 @@ public class TezChild {
this.objectRegistry = objectRegistry;
- // Security framework already loaded the tokens into current ugi
- Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
if (LOG.isDebugEnabled()) {
LOG.debug("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
@@ -407,14 +409,12 @@ public class TezChild {
public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
Map<String, String> serviceProviderEnvMap, @Nullable String pid,
- ExecutionContext ExecutionContext)
+ ExecutionContext ExecutionContext, Credentials credentials)
throws IOException, InterruptedException, TezException {
// Pull in configuration specified for the session.
// TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
// for each and every task, and reading it back from disk. Also needs to be per vertex.
- TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
- UserGroupInformation.setConfiguration(conf);
Limits.setConfiguration(conf);
// Should this be part of main - Metrics and ObjectRegistry. TezTask setup should be independent
@@ -426,7 +426,7 @@ public class TezChild {
return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
- ExecutionContext);
+ ExecutionContext, credentials);
}
public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -451,9 +451,15 @@ public class TezChild {
+ " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber
+ " tokenIdentifier: " + tokenIdentifier);
}
+
+ // Security framework already loaded the tokens into current ugi
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), defaultConf);
+ UserGroupInformation.setConfiguration(defaultConf);
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
- System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())));
+ System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
+ credentials);
tezChild.run();
}