You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:19 UTC
[29/50] [abbrv] tez git commit: TEZ-2653. Change service contexts to
expose a user specified payload instead of the AM configuration. (sseth)
TEZ-2653. Change service contexts to expose a user specified payload
instead of the AM configuration. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a18c5d5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a18c5d5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a18c5d5
Branch: refs/heads/TEZ-2003
Commit: 4a18c5d5baaaab4565da9d3c3085d98dfc91d07e
Parents: ec5acd8
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:56:56 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:14:40 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/dag/api/NamedEntityDescriptor.java | 7 ++-
.../api/ContainerLauncherContext.java | 5 +-
.../api/TaskSchedulerContext.java | 5 +-
.../tez/dag/api/TaskCommunicatorContext.java | 4 +-
.../dag/app/ContainerLauncherContextImpl.java | 10 ++--
.../org/apache/tez/dag/app/DAGAppMaster.java | 20 +++++--
.../dag/app/TaskAttemptListenerImpTezDag.java | 19 ++++--
.../dag/app/TaskCommunicatorContextImpl.java | 12 ++--
.../tez/dag/app/TezTaskCommunicatorImpl.java | 12 +++-
.../dag/app/launcher/ContainerLauncherImpl.java | 8 ++-
.../app/launcher/ContainerLauncherRouter.java | 24 +++++---
.../app/launcher/LocalContainerLauncher.java | 10 +++-
.../dag/app/rm/LocalTaskSchedulerService.java | 10 +++-
.../dag/app/rm/TaskSchedulerContextImpl.java | 12 ++--
.../app/rm/TaskSchedulerContextImplWrapper.java | 6 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 61 ++++++++++++--------
.../dag/app/rm/YarnTaskSchedulerService.java | 21 +++++--
.../apache/tez/dag/app/MockDAGAppMaster.java | 15 ++++-
.../app/TestTaskAttemptListenerImplTezDag.java | 32 ++++++++--
.../app/TestTaskAttemptListenerImplTezDag2.java | 12 +++-
.../tez/dag/app/rm/TestContainerReuse.java | 19 +++---
.../tez/dag/app/rm/TestTaskScheduler.java | 3 +-
.../app/rm/TestTaskSchedulerEventHandler.java | 18 +++++-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 26 +++++++--
.../TezTestServiceContainerLauncher.java | 15 +++--
.../rm/TezTestServiceTaskSchedulerService.java | 10 +++-
.../TezTestServiceTaskCommunicatorImpl.java | 2 +-
.../tez/tests/TestExternalTezServices.java | 13 ++++-
29 files changed, 294 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index a201942..b88044b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -38,5 +38,6 @@ ALL CHANGES:
TEZ-2005. Define basic interface for pluggable TaskScheduler.
TEZ-2651. Pluggable services should not extend AbstractService.
TEZ-2652. Cleanup the way services are specified for an AM and vertices.
+ TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
index bad0d10..723d43f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
@@ -17,7 +17,7 @@ package org.apache.tez.dag.api;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
-public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends EntityDescriptor<T> {
+public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>> {
private final String entityName;
@InterfaceAudience.Private
@@ -30,4 +30,9 @@ public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends Entity
public String getEntityName() {
return entityName;
}
+
+ public T setUserPayload(UserPayload userPayload) {
+ super.setUserPayload(userPayload);
+ return (T) this;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index 836dc4a..5da38b8 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -16,9 +16,9 @@ package org.apache.tez.serviceplugins.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.UserPayload;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@@ -43,8 +43,7 @@ public interface ContainerLauncherContext {
// Lookup APIs
- // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed.
- Configuration getInitialConfiguration();
+ UserPayload getInitialUserPayload();
int getNumNodes(String sourceName);
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index b2c8799..6f37641 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -20,7 +20,6 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -30,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.UserPayload;
@InterfaceAudience.Public
@InterfaceStability.Unstable
@@ -84,8 +84,7 @@ public interface TaskSchedulerContext {
// Getters
- // TODO TEZ-2003. To be replaced by getInitialPayload
- public Configuration getInitialConfiguration();
+ public UserPayload getInitialUserPayload();
public String getAppTrackingUrl();
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index ab32ec1..a1e94a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -18,7 +18,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -36,8 +35,7 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
- // TODO TEZ-2003 To be replaced by getInitialPayload
- Configuration getInitialConfiguration();
+ UserPayload getInitialUserPayload();
ApplicationAttemptId getApplicationAttemptId();
Credentials getCredentials();
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 997775a..92bbbdc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -14,10 +14,10 @@
package org.apache.tez.dag.app;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
@@ -33,10 +33,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
private final AppContext context;
private final TaskAttemptListener tal;
+ private final UserPayload initialUserPayload;
- public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) {
+ public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) {
this.context = appContext;
this.tal = tal;
+ this.initialUserPayload = initialUserPayload;
}
@Override
@@ -76,8 +78,8 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
}
@Override
- public Configuration getInitialConfiguration() {
- return context.getAMConf();
+ public UserPayload getInitialUserPayload() {
+ return initialUserPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4f75891..52621bd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -62,8 +62,10 @@ import com.google.common.collect.HashBiMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -492,9 +494,12 @@ public class DAGAppMaster extends AbstractService {
jobTokenSecretManager.addTokenForJob(
appAttemptID.getApplicationId().toString(), sessionToken);
+ UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
+
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
- taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal);
+ taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors,
+ defaultPayload, isLocal);
addIfService(taskAttemptListener, true);
containerSignatureMatcher = createContainerSignatureMatcher();
@@ -540,9 +545,11 @@ public class DAGAppMaster extends AbstractService {
}
}
+
+
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
- taskSchedulerDescriptors, isLocal);
+ taskSchedulerDescriptors, defaultPayload, isLocal);
addIfService(taskSchedulerEventHandler, true);
if (enableWebUIService()) {
@@ -560,7 +567,7 @@ public class DAGAppMaster extends AbstractService {
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
- this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal);
+ this.containerLauncherRouter = createContainerLauncherRouter(defaultPayload, containerLauncherDescriptors, isLocal);
addIfService(containerLauncherRouter, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
@@ -1071,10 +1078,11 @@ public class DAGAppMaster extends AbstractService {
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
List<NamedEntityDescriptor> entityDescriptors,
+ UserPayload defaultUserPayload,
boolean isLocal) {
TaskAttemptListener lis =
new TaskAttemptListenerImpTezDag(context, thh, chh,
- entityDescriptors, amConf, isLocal);
+ entityDescriptors, defaultUserPayload, isLocal);
return lis;
}
@@ -1095,11 +1103,11 @@ public class DAGAppMaster extends AbstractService {
return chh;
}
- protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
+ protected ContainerLauncherRouter createContainerLauncherRouter(UserPayload defaultPayload,
List<NamedEntityDescriptor> containerLauncherDescriptors,
boolean isLocal) throws
UnknownHostException {
- return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
+ return new ContainerLauncherRouter(defaultPayload, context, taskAttemptListener, workingDirectory,
containerLauncherDescriptors, isLocal);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1e34184..cc109a6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.ListUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -103,7 +103,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
public TaskAttemptListenerImpTezDag(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
List<NamedEntityDescriptor> taskCommunicatorDescriptors,
- Configuration conf,
+ UserPayload defaultUserPayload,
boolean isPureLocalMode) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
@@ -112,17 +112,26 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) {
if (isPureLocalMode) {
taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
- TezConstants.getTezUberServicePluginName(), null));
+ TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload));
} else {
taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
- TezConstants.getTezYarnServicePluginName(), null));
+ TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload));
}
}
this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
- taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
+ UserPayload userPayload;
+ if (taskCommunicatorDescriptors.get(i).getEntityName()
+ .equals(TezConstants.getTezYarnServicePluginName()) ||
+ taskCommunicatorDescriptors.get(i).getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
+ userPayload = defaultUserPayload;
+ } else {
+ userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
+ }
+ taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 035db93..cc315b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -23,10 +23,10 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -49,17 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
private final int taskCommunicatorIndex;
private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
- private final Configuration conf;
+ private final UserPayload userPayload;
private DAG dag;
public TaskCommunicatorContextImpl(AppContext appContext,
TaskAttemptListenerImpTezDag taskAttemptListener,
- Configuration conf,
+ UserPayload userPayload,
int taskCommunicatorIndex) {
this.context = appContext;
this.taskAttemptListener = taskAttemptListener;
- this.conf = conf;
+ this.userPayload = userPayload;
this.taskCommunicatorIndex = taskCommunicatorIndex;
ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
@@ -68,8 +68,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
}
@Override
- public Configuration getInitialConfiguration() {
- return conf;
+ public UserPayload getInitialUserPayload() {
+ return userPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 93b5b43..2a5c80e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -77,6 +77,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
protected final String tokenIdentifier;
protected final Token<JobTokenIdentifier> sessionToken;
+ protected final Configuration conf;
protected InetSocketAddress address;
protected volatile Server server;
@@ -119,6 +120,12 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
+ try {
+ conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Unable to parse user payload for " + TezTaskCommunicatorImpl.class.getSimpleName(), e);
+ }
}
@Override
@@ -132,7 +139,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
protected void startRpcServer() {
- Configuration conf = getContext().getInitialConfiguration();
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
@@ -171,6 +177,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
}
+ protected Configuration getConf() {
+ return this.conf;
+ }
+
private void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index cba5c80..07d269d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
@@ -224,7 +225,12 @@ public class ContainerLauncherImpl extends ContainerLauncher {
public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
super(containerLauncherContext);
- this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(containerLauncherContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to parse user payload for " + ContainerLauncherImpl.class.getSimpleName(), e);
+ }
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 594e6d3..2d56bfe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
@@ -63,7 +64,7 @@ public class ContainerLauncherRouter extends AbstractService
}
// Accepting conf to setup final parameters, if required.
- public ContainerLauncherRouter(Configuration conf, AppContext context,
+ public ContainerLauncherRouter(UserPayload defaultUserPayload, AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
List<NamedEntityDescriptor> containerLauncherDescriptors,
@@ -74,10 +75,10 @@ public class ContainerLauncherRouter extends AbstractService
if (containerLauncherDescriptors == null || containerLauncherDescriptors.isEmpty()) {
if (isPureLocalMode) {
containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
- TezConstants.getTezUberServicePluginName(), null));
+ TezConstants.getTezUberServicePluginName(), null).setUserPayload(defaultUserPayload));
} else {
containerLauncherDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
- TezConstants.getTezYarnServicePluginName(), null));
+ TezConstants.getTezYarnServicePluginName(), null).setUserPayload(defaultUserPayload));
}
}
containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
@@ -86,10 +87,20 @@ public class ContainerLauncherRouter extends AbstractService
for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
- ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
+ UserPayload userPayload;
+ if (containerLauncherDescriptors.get(i).getEntityName()
+ .equals(TezConstants.getTezYarnServicePluginName()) ||
+ containerLauncherDescriptors.get(i).getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
+ userPayload = defaultUserPayload;
+ } else {
+ userPayload = containerLauncherDescriptors.get(i).getUserPayload();
+ }
+ ContainerLauncherContext containerLauncherContext =
+ new ContainerLauncherContextImpl(context, taskAttemptListener, userPayload);
containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
- containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+ containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode);
containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
}
}
@@ -99,8 +110,7 @@ public class ContainerLauncherRouter extends AbstractService
ContainerLauncherContext containerLauncherContext,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
- boolean isPureLocalMode,
- Configuration conf) throws
+ boolean isPureLocalMode) throws
UnknownHostException {
if (containerLauncherDescriptor.getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 3975111..1d3e6df 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,6 +44,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -129,7 +130,14 @@ public class LocalContainerLauncher extends ContainerLauncher {
System.getenv(Environment.NM_HOST.name());
executionContext = new ExecutionContextImpl(host);
- numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+ Configuration conf;
+ try {
+ conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to parse user payload for " + LocalContainerLauncher.class.getSimpleName(), e);
+ }
+ numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");
ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors,
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 1d889ae..395589c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.rm;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -27,6 +28,7 @@ import java.util.LinkedHashMap;
import com.google.common.primitives.Ints;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
@@ -65,7 +67,13 @@ public class LocalTaskSchedulerService extends TaskScheduler {
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier();
- this.conf = taskSchedulerContext.getInitialConfiguration();
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to deserialize payload for " + LocalTaskSchedulerService.class.getSimpleName(),
+ e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 890870e..7f1d5a3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -18,7 +18,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -40,12 +40,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
private final long customClusterIdentifier;
private final String appHostName;
private final int clientPort;
- private final Configuration conf;
+ private final UserPayload initialUserPayload;
public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
int schedulerId, String trackingUrl, long customClusterIdentifier,
String appHostname, int clientPort,
- Configuration conf) {
+ UserPayload initialUserPayload) {
this.tseh = tseh;
this.appContext = appContext;
this.schedulerId = schedulerId;
@@ -53,7 +53,7 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
this.customClusterIdentifier = customClusterIdentifier;
this.appHostName = appHostname;
this.clientPort = clientPort;
- this.conf = conf;
+ this.initialUserPayload = initialUserPayload;
}
@@ -110,8 +110,8 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
}
@Override
- public Configuration getInitialConfiguration() {
- return conf;
+ public UserPayload getInitialUserPayload() {
+ return initialUserPayload;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index e64ef43..9e4c8e0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -37,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
/**
@@ -132,8 +132,8 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
// does not use locks.
@Override
- public Configuration getInitialConfiguration() {
- return real.getInitialConfiguration();
+ public UserPayload getInitialUserPayload() {
+ return real.getInitialUserPayload();
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 8038e2c..4899f82 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -114,7 +115,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
private AtomicBoolean shouldUnregisterFlag =
new AtomicBoolean(false);
private final WebUIService webUI;
- private final String[] taskSchedulerClasses;
+ private final NamedEntityDescriptor[] taskSchedulerDescriptors;
protected final TaskScheduler[]taskSchedulers;
protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
@@ -152,7 +153,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
public TaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
- List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
+ List<NamedEntityDescriptor> schedulerDescriptors, UserPayload defaultPayload,
+ boolean isPureLocalMode) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
@@ -168,39 +170,50 @@ public class TaskSchedulerEventHandler extends AbstractService implements
// Override everything for pure local mode
if (isPureLocalMode) {
- this.taskSchedulerClasses = new String[] {TezConstants.getTezUberServicePluginName()};
+ this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{
+ new NamedEntityDescriptor(TezConstants.getTezUberServicePluginName(), null)
+ .setUserPayload(defaultPayload)};
this.yarnTaskSchedulerIndex = -1;
} else {
if (schedulerDescriptors == null || schedulerDescriptors.isEmpty()) {
- this.taskSchedulerClasses = new String[] {TezConstants.getTezYarnServicePluginName()};
+ this.taskSchedulerDescriptors = new NamedEntityDescriptor[]{
+ new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(defaultPayload)};
this.yarnTaskSchedulerIndex = 0;
} else {
// Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
int foundYarnTaskSchedulerIndex = -1;
- List<String> taskSchedulerClassList = new LinkedList<>();
+ List<NamedEntityDescriptor> schedulerDescriptorList = new LinkedList<>();
for (int i = 0 ; i < schedulerDescriptors.size() ; i++) {
if (schedulerDescriptors.get(i).getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) {
- taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+ schedulerDescriptorList.add(
+ new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null)
+ .setUserPayload(
+ defaultPayload));
foundYarnTaskSchedulerIndex = i;
} else if (schedulerDescriptors.get(i).getEntityName().equals(
TezConstants.getTezUberServicePluginName())) {
- taskSchedulerClassList.add(schedulerDescriptors.get(i).getEntityName());
+ schedulerDescriptorList.add(
+ new NamedEntityDescriptor(schedulerDescriptors.get(i).getEntityName(), null)
+ .setUserPayload(
+ defaultPayload));
} else {
- taskSchedulerClassList.add(schedulerDescriptors.get(i).getClassName());
+ schedulerDescriptorList.add(schedulerDescriptors.get(i));
}
}
if (foundYarnTaskSchedulerIndex == -1) {
- taskSchedulerClassList.add(YarnTaskSchedulerService.class.getName());
- foundYarnTaskSchedulerIndex = taskSchedulerClassList.size() -1;
+ schedulerDescriptorList.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null).setUserPayload(
+ defaultPayload));
+ foundYarnTaskSchedulerIndex = schedulerDescriptorList.size() -1;
}
- this.taskSchedulerClasses = taskSchedulerClassList.toArray(new String[taskSchedulerClassList.size()]);
+ this.taskSchedulerDescriptors = schedulerDescriptorList.toArray(new NamedEntityDescriptor[schedulerDescriptorList.size()]);
this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
}
}
- taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length];
- taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length];
+ taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
+ taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -417,23 +430,24 @@ public class TaskSchedulerEventHandler extends AbstractService implements
private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
AppContext appContext,
- String schedulerClassName,
+ NamedEntityDescriptor taskSchedulerDescriptor,
long customAppIdIdentifier,
int schedulerId) {
TaskSchedulerContext rawContext =
new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
- customAppIdIdentifier, host, port, getConfig());
+ customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
- if (schedulerClassName.equals(TezConstants.getTezYarnServicePluginName())) {
+ String schedulerName = taskSchedulerDescriptor.getEntityName();
+ if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
return new YarnTaskSchedulerService(wrappedContext);
- } else if (schedulerClassName.equals(TezConstants.getTezUberServicePluginName())) {
+ } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
LOG.info("Creating TaskScheduler: Local TaskScheduler");
return new LocalTaskSchedulerService(wrappedContext);
} else {
- LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
+ LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(), taskSchedulerDescriptor.getClassName());
Class<? extends TaskScheduler> taskSchedulerClazz =
- (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName);
+ (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(taskSchedulerDescriptor.getClassName());
try {
Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
.getConstructor(TaskSchedulerContext.class);
@@ -453,21 +467,20 @@ public class TaskSchedulerEventHandler extends AbstractService implements
@VisibleForTesting
protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
- // TODO Add error checking for components being used in the Vertex when running in pure local mode.
// Iterate over the list and create all the taskSchedulers
int j = 0;
- for (int i = 0; i < taskSchedulerClasses.length; i++) {
+ for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
long customAppIdIdentifier;
- if (isPureLocalMode || taskSchedulerClasses[i].equals(
+ if (isPureLocalMode || taskSchedulerDescriptors[i].equals(
TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
}
- LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
+ LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
- trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
+ trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 08821b0..b4d1f26 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -31,13 +31,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
@@ -72,7 +70,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/* TODO not yet updating cluster nodes on every allocate response
* from RMContainerRequestor
@@ -220,7 +217,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
this.appHostName = taskSchedulerContext.getAppHostName();
this.appHostPort = taskSchedulerContext.getAppClientPort();
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
- this.conf = taskSchedulerContext.getInitialConfiguration();
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
+ e);
+ }
}
@Private
@@ -233,7 +236,13 @@ public class YarnTaskSchedulerService extends TaskScheduler
this.appHostName = taskSchedulerContext.getAppHostName();
this.appHostPort = taskSchedulerContext.getAppClientPort();
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
- this.conf = taskSchedulerContext.getInitialConfiguration();
+ try {
+ this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(
+ "Failed to deserialize payload for " + YarnTaskSchedulerService.class.getSimpleName(),
+ e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 17feeaa..0723dbc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.net.UnknownHostException;
@@ -34,7 +35,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -488,7 +491,15 @@ public class MockDAGAppMaster extends DAGAppMaster {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1,
credentials, jobUserName, null);
- containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
+ Configuration conf = new Configuration(false);
+ UserPayload userPayload;
+ try {
+ userPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ containerLauncherContext =
+ new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener(), userPayload);
containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
this.initFailFlag = initFailFlag;
@@ -500,7 +511,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
// use mock container launcher for tests
@Override
- protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
+ protected ContainerLauncherRouter createContainerLauncherRouter(final UserPayload defaultUserPayload,
List<NamedEntityDescriptor> containerLauncherDescirptors,
boolean isLocal)
throws UnknownHostException {
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index e45b0a2..1cb69a8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,11 +47,14 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -135,8 +138,15 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
doReturn(container).when(amContainer).getContainer();
+ Configuration conf = new TezConfiguration();
+ UserPayload defaultPayload;
+ try {
+ defaultPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, defaultPayload, false);
TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
@@ -299,8 +309,14 @@ public class TestTaskAttemptListenerImplTezDag {
new JobTokenSecretManager());
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, credentials);
+ UserPayload userPayload = null;
+ try {
+ userPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false);
// no exception happen, should started properly
taskAttemptListener.init(conf);
taskAttemptListener.start();
@@ -319,8 +335,14 @@ public class TestTaskAttemptListenerImplTezDag {
TokenCache.setSessionToken(sessionToken, credentials);
conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+ UserPayload userPayload = null;
+ try {
+ userPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, userPayload, false);
taskAttemptListener.init(conf);
taskAttemptListener.start();
int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
@@ -377,9 +399,9 @@ public class TestTaskAttemptListenerImplTezDag {
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
List<NamedEntityDescriptor> taskCommDescriptors,
- Configuration conf,
+ UserPayload userPayload,
boolean isPureLocalMode) {
- super(context, thh, chh, taskCommDescriptors, conf,
+ super(context, thh, chh, taskCommDescriptors, userPayload,
isPureLocalMode);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index 8d776fb..1c82bd8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -22,9 +22,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -34,6 +36,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
@@ -51,7 +57,7 @@ import org.mockito.ArgumentCaptor;
public class TestTaskAttemptListenerImplTezDag2 {
@Test(timeout = 5000)
- public void testTaskAttemptFailedKilled() {
+ public void testTaskAttemptFailedKilled() throws IOException {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
Credentials credentials = new Credentials();
@@ -73,9 +79,11 @@ public class TestTaskAttemptListenerImplTezDag2 {
doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
doReturn(container).when(amContainer).getContainer();
+ Configuration conf = new TezConfiguration();
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
TaskAttemptListenerImpTezDag taskAttemptListener =
new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
- mock(ContainerHeartbeatHandler.class), null, null, false);
+ mock(ContainerHeartbeatHandler.class), null, userPayload, false);
TaskSpec taskSpec1 = mock(TaskSpec.class);
TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 88f6066..8e8224a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.common.TezUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -144,7 +145,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(
appContext, eventHandler, rmClient,
- new AlwaysMatchesContainerMatcher());
+ new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
TaskSchedulerEventHandler taskSchedulerEventHandler =
spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(conf);
@@ -279,7 +280,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
- new AlwaysMatchesContainerMatcher());
+ new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
TaskSchedulerEventHandler taskSchedulerEventHandler =
spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(conf);
@@ -378,7 +379,7 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher());
+ TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
@@ -514,7 +515,7 @@ public class TestContainerReuse {
//Use ContainerContextMatcher here. Otherwise it would not match the JVM options
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher());
+ new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
@@ -709,7 +710,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(
appContext, eventHandler, rmClient,
- new AlwaysMatchesContainerMatcher());
+ new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler =
spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
@@ -833,7 +834,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
- new AlwaysMatchesContainerMatcher());
+ new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler =
spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
@@ -947,7 +948,7 @@ public class TestContainerReuse {
doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher());
+ TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
@@ -1105,7 +1106,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
- new ContainerContextMatcher());
+ new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
@@ -1259,7 +1260,7 @@ public class TestContainerReuse {
TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
- new AlwaysMatchesContainerMatcher());
+ new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 7b9ac4f..cf21a1d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -145,7 +145,8 @@ public class TestTaskScheduler {
scheduler.initialize();
drainableAppCallback.drain();
- verify(mockRMClient).init(conf);
+ // Verifying the validity of the configuration via the interval only instead of making sure
+ // it's the same instance.
verify(mockRMClient).setHeartbeatInterval(interval);
RegisterApplicationMasterResponse mockRegResponse =
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index f191175..f8aa1e2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -49,9 +49,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
@@ -92,8 +95,10 @@ public class TestTaskSchedulerEventHandler {
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new LinkedList<NamedEntityDescriptor>(), false);
+ ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
+ UserPayload defaultPayload) {
+ super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
+ new LinkedList<NamedEntityDescriptor>(), defaultPayload, false);
}
@Override
@@ -134,8 +139,15 @@ public class TestTaskSchedulerEventHandler {
mockWebUIService = mock(WebUIService.class);
when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
+ Configuration conf = new Configuration(false);
+ UserPayload userPayload;
+ try {
+ userPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
schedulerHandler = new MockTaskSchedulerEventHandler(
- mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
+ mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService, userPayload);
}
@Test(timeout = 5000)
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 60d37e9..59ab00a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
@@ -59,7 +60,10 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
@@ -125,22 +129,26 @@ class TestTaskSchedulerHelpers {
private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
private ContainerSignatureMatcher containerSignatureMatcher;
+ private UserPayload defaultPayload;
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandlerForTest(AppContext appContext,
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
- ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, null, eventHandler, containerSignatureMatcher, null, new LinkedList<NamedEntityDescriptor>(), false);
+ ContainerSignatureMatcher containerSignatureMatcher,
+ UserPayload defaultPayload) {
+ super(appContext, null, eventHandler, containerSignatureMatcher, null,
+ new LinkedList<NamedEntityDescriptor>(), defaultPayload, false);
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
+ this.defaultPayload = defaultPayload;
}
@Override
public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
TaskSchedulerContext taskSchedulerContext =
new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
- getConfig());
+ defaultPayload);
TaskSchedulerContextImplWrapper wrapper =
new TaskSchedulerContextImplWrapper(taskSchedulerContext,
new CountingExecutorService(appCallbackExecutor));
@@ -287,8 +295,8 @@ class TestTaskSchedulerHelpers {
// Not incrementing invocations for methods which to not obtain locks,
// and do not go via the executor service.
@Override
- public Configuration getInitialConfiguration() {
- return real.getInitialConfiguration();
+ public UserPayload getInitialUserPayload() {
+ return real.getInitialUserPayload();
}
@Override
@@ -523,7 +531,13 @@ class TestTaskSchedulerHelpers {
when(mockContext.getAppTrackingUrl()).thenReturn(appUrl);
when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP);
- when(mockContext.getInitialConfiguration()).thenReturn(conf);
+ UserPayload userPayload;
+ try {
+ userPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ when(mockContext.getInitialUserPayload()).thenReturn(userPayload);
when(mockContext.isSession()).thenReturn(isSession);
if (containerSignatureMatcher != null) {
when(mockContext.getContainerSignatureMatcher())
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 0002b42..f31a07b 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -19,10 +19,12 @@ import java.net.InetSocketAddress;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -49,17 +51,22 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
private final int servicePort;
private final TezTestServiceCommunicator communicator;
private final ApplicationAttemptId appAttemptId;
- // private final TaskAttemptListener tal;
+ private final Configuration conf;
// Configuration passed in here to set up final parameters
public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
super(containerLauncherContext);
- int numThreads = getContext().getInitialConfiguration().getInt(
+ try {
+ conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ int numThreads = conf.getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
- this.servicePort = getContext().getInitialConfiguration().getInt(
+ this.servicePort = conf.getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
Preconditions.checkArgument(servicePort > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
@@ -70,7 +77,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
@Override
public void start() {
- communicator.init(getContext().getInitialConfiguration());
+ communicator.init(conf);
communicator.start();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 7d209bc..0d87995 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.app.rm;
+import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -32,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.service.TezTestServiceConfConstants;
import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -74,7 +77,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
- Configuration conf = taskSchedulerContext.getInitialConfiguration();
+ Configuration conf = null;
+ try {
+ conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
this.memoryPerInstance = conf
.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
Preconditions.checkArgument(memoryPerInstance > 0,
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 078ea79..ef8f9e4 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -76,7 +76,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
@Override
public void initialize() throws Exception {
super.initialize();
- this.communicator.init(getContext().getInitialConfiguration());
+ this.communicator.init(getConf());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/4a18c5d5/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 07dd363..2c52ae3 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.client.DAGClient;
@@ -136,17 +138,22 @@ public class TestExternalTezServices {
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs);
+
TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
TaskSchedulerDescriptor
- .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())};
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
+ .setUserPayload(userPayload)};
ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
ContainerLauncherDescriptor
- .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())};
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
+ .setUserPayload(userPayload)};
TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
TaskCommunicatorDescriptor
- .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())};
+ .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
+ .setUserPayload(userPayload)};
ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);