You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/26 01:00:52 UTC
git commit: TEZ-192. Read tokens from UGI rather than a tokens file.
(sseth)
Updated Branches:
refs/heads/master 6d8646c32 -> 1c44f634c
TEZ-192. Read tokens from UGI rather than a tokens file. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1c44f634
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1c44f634
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1c44f634
Branch: refs/heads/master
Commit: 1c44f634c2820ee5ced891a83654060273ab4a0d
Parents: 6d8646c
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Oct 25 16:00:06 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Oct 25 16:00:06 2013 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/TezConfiguration.java | 7 --
.../tez/dag/app/ContainerHeartbeatHandler.java | 4 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 67 ++++++--------------
.../tez/dag/app/HeartbeatHandlerBase.java | 6 +-
.../tez/dag/app/TaskHeartbeatHandler.java | 4 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 27 +++++---
.../tez/mapreduce/hadoop/DeprecatedKeys.java | 5 --
.../tez/mapreduce/hadoop/MRJobConfig.java | 6 --
.../org/apache/tez/mapreduce/YARNRunner.java | 14 +---
9 files changed, 48 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8d3832e..04b96ba 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -43,9 +43,6 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";
- // TODO Should not be required once all tokens are handled via AppSubmissionContext
- public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
- public static final String APPLICATION_TOKENS_FILE = "appTokens";
public static final String TEZ_APPLICATION_MASTER_CLASS =
"org.apache.tez.dag.app.DAGAppMaster";
@@ -65,10 +62,6 @@ public class TezConfiguration extends Configuration {
TEZ_AM_PREFIX + "task.listener.thread-count";
public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
- public static final String TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT =
- TEZ_AM_PREFIX + "container.listener.thread-count";
- public static final int TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30;
-
// TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
// TODO Are any of these node blacklisting properties required. (other than for MR compat)
public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
index 2e813ff..55ce7da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerHeartbeatHandler.java
@@ -29,8 +29,8 @@ public class ContainerHeartbeatHandler extends
public ContainerHeartbeatHandler(AppContext context,
- int numThreads) {
- super(context, numThreads, "ContainerHeartbeatHandler");
+ int expectedConcurrency) {
+ super(context, expectedConcurrency, "ContainerHeartbeatHandler");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4e4e82e..6d2ec19 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -46,7 +46,6 @@ import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -54,7 +53,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
@@ -74,6 +72,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -248,8 +247,6 @@ public class DAGAppMaster extends AbstractService {
this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
- downloadTokensAndSetupUGI(conf);
-
context = new RunningAppContext(conf);
clientHandler = new DAGClientHandler();
@@ -480,7 +477,7 @@ public class DAGAppMaster extends AbstractService {
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
- String dagIdString = dagId.toString().replace("application", "job");
+ String dagIdString = dagId.toString();
jobTokenSecretManager.addTokenForJob(dagIdString, sessionToken);
LOG.info("Adding job token for " + dagIdString
+ " to jobTokenSecretManager");
@@ -503,43 +500,7 @@ public class DAGAppMaster extends AbstractService {
return newDag;
} // end createDag()
-
-
- /**
- * Obtain the tokens needed by the job and put them in the UGI
- * @param conf
- */
- protected void downloadTokensAndSetupUGI(Configuration conf) {
- // TODO remove - TEZ-71
- try {
- this.currentUser = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- // Read the file-system tokens from the localized tokens-file.
- Path jobSubmitDir =
- FileContext.getLocalFSFileContext().makeQualified(
- new Path(new File(TezConfiguration.JOB_SUBMIT_DIR)
- .getAbsolutePath()));
- Path jobTokenFile =
- new Path(jobSubmitDir, TezConfiguration.APPLICATION_TOKENS_FILE);
- tokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
- LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
- + jobTokenFile);
-
- for (Token<? extends TokenIdentifier> tk : tokens.getAllTokens()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Token of kind " + tk.getKind()
- + "in current ugi in the AppMaster for service "
- + tk.getService());
- }
- currentUser.addToken(tk); // For use by AppMaster itself.
- }
- }
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
- }
-
+
protected void addIfService(Object object, boolean addDispatcher) {
if (object instanceof Service) {
Service service = (Service) object;
@@ -580,8 +541,8 @@ public class DAGAppMaster extends AbstractService {
AppContext context, Configuration conf) {
ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context,
conf.getInt(
- TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT,
- TezConfiguration.TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
+ TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+ TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
return chh;
}
@@ -1449,12 +1410,26 @@ public class DAGAppMaster extends AbstractService {
protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
final Configuration conf, String jobUserName) throws IOException,
InterruptedException {
- Credentials credentials =
- UserGroupInformation.getCurrentUser().getCredentials();
UserGroupInformation.setConfiguration(conf);
+ appMaster.currentUser = UserGroupInformation.getCurrentUser();
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
+
+ // Now remove the AM->RM token so tasks don't have it
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove();
+ }
+ }
+
+ appMaster.tokens = credentials;
+
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
index 1abd934..3741150 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/HeartbeatHandlerBase.java
@@ -44,15 +44,15 @@ public abstract class HeartbeatHandlerBase<T> extends AbstractService {
private ConcurrentMap<T, ReportTime> runningMap;
private volatile boolean stopped;
- public HeartbeatHandlerBase(AppContext appContext, int numThreads, String name) {
+ public HeartbeatHandlerBase(AppContext appContext, int expectedConcurrency, String name) {
super(name);
this.name = name;
this.eventHandler = appContext.getEventHandler();
this.clock = appContext.getClock();
this.appContext = appContext;
- numThreads = numThreads == 0 ? 1 : numThreads;
+ expectedConcurrency = expectedConcurrency == 0 ? 1 : expectedConcurrency;
this.runningMap = new ConcurrentHashMap<T, HeartbeatHandlerBase.ReportTime>(
- 16, 0.75f, numThreads);
+ 16, 0.75f, expectedConcurrency);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 9f576cc..c41244d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -35,8 +35,8 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@SuppressWarnings({"unchecked"})
public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID> {
- public TaskHeartbeatHandler(AppContext context, int numThreads) {
- super(context, numThreads, "TaskHeartbeatHandler");
+ public TaskHeartbeatHandler(AppContext context, int expectedConcurrency) {
+ super(context, expectedConcurrency, "TaskHeartbeatHandler");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 853bacf..9b33557 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -863,6 +863,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
writeLock.lock();
try {
this.vertexLocationHint = vertexLocationHint;
+ if (LOG.isDebugEnabled()) {
+ logLocationHints(this.vertexLocationHint);
+ }
} finally {
writeLock.unlock();
}
@@ -1984,15 +1987,23 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for (TaskLocationHint taskLocationHint : locationHint
.getTaskLocationHints()) {
StringBuilder sb = new StringBuilder();
- sb.append("Hosts: ");
- for (String host : taskLocationHint.getDataLocalHosts()) {
- hosts.add(host);
- sb.append(host).append(", ");
+ if (taskLocationHint.getDataLocalHosts() == null) {
+ sb.append("No Hosts");
+ } else {
+ sb.append("Hosts: ");
+ for (String host : taskLocationHint.getDataLocalHosts()) {
+ hosts.add(host);
+ sb.append(host).append(", ");
+ }
}
- sb.append("Racks: ");
- for (String rack : taskLocationHint.getRacks()) {
- racks.add(rack);
- sb.append(rack).append(", ");
+ if (taskLocationHint.getRacks() == null) {
+ sb.append("No Racks");
+ } else {
+ sb.append("Racks: ");
+ for (String rack : taskLocationHint.getRacks()) {
+ racks.add(rack);
+ sb.append(rack).append(", ");
+ }
}
LOG.debug("Location: " + counter + " : " + sb.toString());
counter++;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index dac92ed..f0cb289 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -107,11 +107,6 @@ public class DeprecatedKeys {
}
private static void populateMRToDagParamMap() {
- mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR,
- TezConfiguration.JOB_SUBMIT_DIR);
- mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE,
- TezConfiguration.APPLICATION_TOKENS_FILE);
-
// TODO Default value handling.
mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 65eca5f..940b1e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -572,12 +572,6 @@ public interface MRJobConfig {
public static final String APPLICATION_MASTER_CLASS =
"org.apache.tez.dag.app.DAGAppMaster";
- // The token file for the application. Should contain tokens for access to
- // remote file system and may optionally contain application specific tokens.
- // For now, generated by the AppManagers and used by NodeManagers and the
- // Containers.
- public static final String APPLICATION_TOKENS_FILE = "appTokens";
-
/** The log directory for the containers */
public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c44f634/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 87df0b0..96f6936 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -90,7 +90,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
@@ -310,8 +309,7 @@ public class YARNRunner implements ClientProtocol {
// TODO gross hack
for (String s : new String[] {
MRJobConfig.JOB_SPLIT,
- MRJobConfig.JOB_SPLIT_METAINFO,
- MRJobConfig.APPLICATION_TOKENS_FILE }) {
+ MRJobConfig.JOB_SPLIT_METAINFO}) {
localResources.put(s,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, s), LocalResourceType.FILE));
@@ -507,16 +505,6 @@ public class YARNRunner implements ClientProtocol {
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
- // TEZ-192 - stop using token file
- // Upload only in security mode: TODO
- Path applicationTokensFile =
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
- try {
- ts.writeTokenStorageFile(applicationTokensFile, conf);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
-
ApplicationId appId = resMgrDelegate.getApplicationId();
FileSystem fs = FileSystem.get(conf);