You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:34 UTC
[42/51] [abbrv] tez git commit: TEZ-2651. Pluggable services should
not extend AbstractService. (sseth)
TEZ-2651. Pluggable services should not extend AbstractService. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5da29df
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5da29df
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5da29df
Branch: refs/heads/TEZ-2003
Commit: f5da29dfffe709d0b78ad69cf09a4c1b93b13411
Parents: 55a9eed
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:55:40 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 6 01:26:11 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../serviceplugins/api/ContainerLauncher.java | 18 ++++++++++--
.../apache/tez/dag/api/TaskCommunicator.java | 30 +++++++++++++++++---
.../tez/dag/api/TaskCommunicatorContext.java | 5 ++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 4 +--
.../dag/app/TaskAttemptListenerImpTezDag.java | 16 ++++++-----
.../dag/app/TaskCommunicatorContextImpl.java | 9 ++++++
.../tez/dag/app/TezTaskCommunicatorImpl.java | 24 ++++++----------
.../dag/app/launcher/ContainerLauncherImpl.java | 6 ++--
.../app/launcher/ContainerLauncherRouter.java | 12 ++++++--
.../app/launcher/LocalContainerLauncher.java | 6 ++--
.../apache/tez/dag/app/MockDAGAppMaster.java | 6 ++--
.../app/TestTaskAttemptListenerImplTezDag.java | 8 +++---
.../TezTestServiceContainerLauncher.java | 6 ++--
.../TezTestServiceNoOpContainerLauncher.java | 2 +-
.../TezTestServiceTaskCommunicatorImpl.java | 29 +++++++++----------
16 files changed, 116 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index a51669d..e57f76f 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -36,5 +36,6 @@ ALL CHANGES:
TEZ-2124. Change Node tracking to work per external container source.
TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
TEZ-2005. Define basic interface for pluggable TaskScheduler.
+ TEZ-2651. Pluggable services should not extend AbstractService.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 218edb6..8337dcb 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -17,6 +17,7 @@ package org.apache.tez.serviceplugins.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ServicePluginLifecycle;
/**
* Plugin to allow custom container launchers to be written to launch containers on different types
@@ -25,18 +26,29 @@ import org.apache.hadoop.service.AbstractService;
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public abstract class ContainerLauncher extends AbstractService {
+public abstract class ContainerLauncher implements ServicePluginLifecycle {
private final ContainerLauncherContext containerLauncherContext;
// TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService.
// A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal
// configuration to the services if populated with the AM Configuration
- public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) {
- super(name);
+ public ContainerLauncher(ContainerLauncherContext containerLauncherContext) {
this.containerLauncherContext = containerLauncherContext;
}
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ }
+
public final ContainerLauncherContext getContext() {
return this.containerLauncherContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 05e437c..f221414 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -18,9 +18,9 @@ import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.ServicePluginLifecycle;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -28,11 +28,33 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
// TODO TEZ-2003 Move this into the tez-api module
-public abstract class TaskCommunicator extends AbstractService {
- public TaskCommunicator(String name) {
- super(name);
+public abstract class TaskCommunicator implements ServicePluginLifecycle {
+
+ private final TaskCommunicatorContext taskCommunicatorContext;
+
+ public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+ this.taskCommunicatorContext = taskCommunicatorContext;
+ }
+
+ public TaskCommunicatorContext getContext() {
+ return taskCommunicatorContext;
+ }
+
+ @Override
+ public void initialize() throws Exception {
}
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ }
+
+ // TODO Post TEZ-2003 Move this into the API module. Moving this requires abstractions for
+ // TaskSpec and related classes. (assuming that's efficient for execution)
+
// TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
// TODO When talking to an external service, this plugin implementer may need access to a host:port
public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index b6e63f7..ab32ec1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -18,6 +18,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -35,6 +36,9 @@ public interface TaskCommunicatorContext {
// TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
+ // TODO TEZ-2003 To be replaced by getInitialPayload
+ Configuration getInitialConfiguration();
+
ApplicationAttemptId getApplicationAttemptId();
Credentials getCredentials();
@@ -42,6 +46,7 @@ public interface TaskCommunicatorContext {
boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
// TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update
+ // KKK Rename this API
TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
boolean isKnownContainer(ContainerId containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index ef27ddf..f3914d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1047,8 +1047,8 @@ public class DAGAppMaster extends AbstractService {
String[] taskCommunicatorClasses,
boolean isLocal) {
TaskAttemptListener lis =
- new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager,
- taskCommunicatorClasses, isLocal);
+ new TaskAttemptListenerImpTezDag(context, thh, chh,
+ taskCommunicatorClasses, amConf, isLocal);
return lis;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 47b63dd..599c208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -61,7 +62,6 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.common.security.JobTokenSecretManager;
@SuppressWarnings("unchecked")
@@ -75,6 +75,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private final AppContext context;
private final TaskCommunicator[] taskCommunicators;
private final TaskCommunicatorContext[] taskCommunicatorContexts;
+ protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -99,9 +100,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
public TaskAttemptListenerImpTezDag(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
- // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
- JobTokenSecretManager jobTokenSecretManager,
String [] taskCommunicatorClassIdentifiers,
+ Configuration conf,
boolean isPureLocalMode) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
@@ -118,9 +118,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
+ this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length];
for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
- taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i);
+ taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
+ taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
}
@@ -129,15 +131,15 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
public void serviceStart() {
// TODO Why is init tied to serviceStart
for (int i = 0 ; i < taskCommunicators.length ; i++) {
- taskCommunicators[i].init(getConfig());
- taskCommunicators[i].start();
+ taskCommunicatorServiceWrappers[i].init(getConfig());
+ taskCommunicatorServiceWrappers[i].start();
}
}
@Override
public void serviceStop() {
for (int i = 0 ; i < taskCommunicators.length ; i++) {
- taskCommunicators[i].stop();
+ taskCommunicatorServiceWrappers[i].stop();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 50e006d..035db93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,14 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
private final int taskCommunicatorIndex;
private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
+ private final Configuration conf;
private DAG dag;
public TaskCommunicatorContextImpl(AppContext appContext,
TaskAttemptListenerImpTezDag taskAttemptListener,
+ Configuration conf,
int taskCommunicatorIndex) {
this.context = appContext;
this.taskAttemptListener = taskAttemptListener;
+ this.conf = conf;
this.taskCommunicatorIndex = taskCommunicatorIndex;
ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
@@ -64,6 +68,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
}
@Override
+ public Configuration getInitialConfiguration() {
+ return conf;
+ }
+
+ @Override
public ApplicationAttemptId getApplicationAttemptId() {
return context.getApplicationAttemptId();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 0374022..93b5b43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -67,7 +67,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
null, true, null, null, false);
- private final TaskCommunicatorContext taskCommunicatorContext;
private final TezTaskUmbilicalProtocol taskUmbilical;
protected final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
@@ -116,25 +115,24 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
* Construct the service.
*/
public TezTaskCommunicatorImpl(TaskCommunicatorContext taskCommunicatorContext) {
- super(TezTaskCommunicatorImpl.class.getName());
- this.taskCommunicatorContext = taskCommunicatorContext;
+ super(taskCommunicatorContext);
this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
- this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+ this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
}
@Override
- public void serviceStart() {
+ public void start() {
startRpcServer();
}
@Override
- public void serviceStop() {
+ public void shutdown() {
stopRpcServer();
}
protected void startRpcServer() {
- Configuration conf = getConfig();
+ Configuration conf = getContext().getInitialConfiguration();
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
@@ -281,10 +279,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
return sessionToken;
}
- protected TaskCommunicatorContext getTaskCommunicatorContext() {
- return taskCommunicatorContext;
- }
-
public TezTaskUmbilicalProtocol getUmbilical() {
return this.taskUmbilical;
}
@@ -305,7 +299,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
task = getContainerTask(containerId);
if (task != null && !task.shouldDie()) {
- taskCommunicatorContext
+ getContext()
.taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId);
}
}
@@ -317,7 +311,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
- return taskCommunicatorContext.canCommit(taskAttemptId);
+ return getContext().canCommit(taskAttemptId);
}
@Override
@@ -370,7 +364,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(),
request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(),
request.getPreRoutedStartIndex(), request.getMaxEvents());
- tResponse = taskCommunicatorContext.heartbeat(tRequest);
+ tResponse = getContext().heartbeat(tRequest);
}
TezHeartbeatResponse response = new TezHeartbeatResponse();
response.setLastRequestId(requestId);
@@ -402,7 +396,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
ContainerInfo containerInfo = registeredContainers.get(containerId);
ContainerTask task = null;
if (containerInfo == null) {
- if (taskCommunicatorContext.isKnownContainer(containerId)) {
+ if (getContext().isKnownContainer(containerId)) {
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed");
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index fe0178c..34c7bc0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -223,7 +223,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
}
public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
- super(ContainerLauncherImpl.class.getName(), containerLauncherContext);
+ super(containerLauncherContext);
this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
@@ -235,7 +235,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
}
@Override
- public void serviceStart() {
+ public void start() {
// pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
cmProxy =
new ContainerManagementProtocolProxy(conf);
@@ -307,7 +307,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
}
@Override
- public void serviceStop() {
+ public void shutdown() {
if(!serviceStopped.compareAndSet(false, true)) {
LOG.info("Ignoring multiple stops");
return;
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 9f741cf..7c6a6a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -45,6 +46,7 @@ public class ContainerLauncherRouter extends AbstractService
private final ContainerLauncher containerLaunchers[];
private final ContainerLauncherContext containerLauncherContexts[];
+ protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
private final AppContext appContext;
@VisibleForTesting
@@ -53,6 +55,8 @@ public class ContainerLauncherRouter extends AbstractService
this.appContext = context;
containerLaunchers = new ContainerLauncher[] {containerLauncher};
containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+ new ServicePluginLifecycleAbstractService(containerLauncher)};
}
// Accepting conf to setup final parameters, if required.
@@ -75,6 +79,7 @@ public class ContainerLauncherRouter extends AbstractService
}
containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
@@ -82,6 +87,7 @@ public class ContainerLauncherRouter extends AbstractService
containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+ containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
}
}
@@ -130,21 +136,21 @@ public class ContainerLauncherRouter extends AbstractService
@Override
public void serviceInit(Configuration conf) {
for (int i = 0 ; i < containerLaunchers.length ; i++) {
- ((AbstractService) containerLaunchers[i]).init(conf);
+ containerLauncherServiceWrappers[i].init(conf);
}
}
@Override
public void serviceStart() {
for (int i = 0 ; i < containerLaunchers.length ; i++) {
- ((AbstractService) containerLaunchers[i]).start();
+ containerLauncherServiceWrappers[i].start();
}
}
@Override
public void serviceStop() {
for (int i = 0 ; i < containerLaunchers.length ; i++) {
- ((AbstractService) containerLaunchers[i]).stop();
+ containerLauncherServiceWrappers[i].stop();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index a1b8e29..3975111 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -111,7 +111,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
// starts up. It's not possible to set these up via a static payload.
// Will need some kind of mechanism to dynamically crate payloads / bind to parameters
// after the AM starts up.
- super(LocalContainerLauncher.class.getName(), containerLauncherContext);
+ super(containerLauncherContext);
this.context = context;
this.tal = taskAttemptListener;
this.workingDirectory = workingDirectory;
@@ -139,14 +139,14 @@ public class LocalContainerLauncher extends ContainerLauncher {
}
@Override
- public void serviceStart() throws Exception {
+ public void start() throws Exception {
eventHandlingThread =
new Thread(new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner");
eventHandlingThread.start();
}
@Override
- public void serviceStop() throws Exception {
+ public void shutdown() throws Exception {
if (!serviceStopped.compareAndSet(false, true)) {
LOG.info("Service Already stopped. Ignoring additional stop");
return;
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3c3c6a7..21ae5f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -139,7 +139,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
public MockContainerLauncher(AtomicBoolean goFlag,
ContainerLauncherContext containerLauncherContext) {
- super("MockContainerLauncher", containerLauncherContext);
+ super(containerLauncherContext);
this.goFlag = goFlag;
}
@@ -182,7 +182,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
@Override
- public void serviceStart() throws Exception {
+ public void start() throws Exception {
taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
eventHandlingThread = new Thread(this);
@@ -199,7 +199,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
@Override
- public void serviceStop() throws Exception {
+ public void shutdown() throws Exception {
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
eventHandlingThread.join(2000l);
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index df643e4..41a7373 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -299,7 +299,7 @@ public class TestTaskAttemptListenerImplTezDag {
sessionToken.setService(identifier.getJobId());
TokenCache.setSessionToken(sessionToken, credentials);
taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
// no exception happen, should started properly
taskAttemptListener.init(conf);
taskAttemptListener.start();
@@ -319,7 +319,7 @@ public class TestTaskAttemptListenerImplTezDag {
conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
taskAttemptListener.init(conf);
taskAttemptListener.start();
int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
@@ -375,10 +375,10 @@ public class TestTaskAttemptListenerImplTezDag {
public TaskAttemptListenerImplForTest(AppContext context,
TaskHeartbeatHandler thh,
ContainerHeartbeatHandler chh,
- JobTokenSecretManager jobTokenSecretManager,
String[] taskCommunicatorClassIdentifiers,
+ Configuration conf,
boolean isPureLocalMode) {
- super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers,
+ super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
isPureLocalMode);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index dbf5054..85f9415 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -54,7 +54,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
// Configuration passed in here to set up final parameters
public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
- super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext);
+ super(containerLauncherContext);
int numThreads = getContext().getInitialConfiguration().getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
@@ -69,13 +69,13 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
}
@Override
- public void serviceStart() {
+ public void start() {
communicator.init(getContext().getInitialConfiguration());
communicator.start();
}
@Override
- public void serviceStop() {
+ public void shutdown() {
communicator.stop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index d3743e1..7b42296 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -27,7 +27,7 @@ public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher {
public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) {
- super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext);
+ super(containerLauncherContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/f5da29df/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 444498e..078ea79 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -23,7 +23,6 @@ import java.util.concurrent.RejectedExecutionException;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.Credentials;
@@ -75,20 +74,20 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
@Override
- public void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- this.communicator.init(conf);
+ public void initialize() throws Exception {
+ super.initialize();
+ this.communicator.init(getContext().getInitialConfiguration());
}
@Override
- public void serviceStart() {
- super.serviceStart();
+ public void start() {
+ super.start();
this.communicator.start();
}
@Override
- public void serviceStop() {
- super.serviceStop();
+ public void shutdown() {
+ super.shutdown();
this.communicator.stop();
}
@@ -132,7 +131,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
// Have to register this up front right now. Otherwise, it's possible for the task to start
// sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
- getTaskCommunicatorContext()
+ getContext()
.taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.submitWork(requestProto, host, port,
new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@@ -154,19 +153,19 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
RemoteException re = (RemoteException) t;
String message = re.toString();
if (message.contains(RejectedExecutionException.class.getName())) {
- getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
} else {
- getTaskCommunicatorContext()
+ getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
t.toString());
}
} else {
if (t instanceof IOException) {
- getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+ getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
} else {
- getTaskCommunicatorContext()
+ getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
t.getMessage());
}
@@ -191,11 +190,11 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
builder.setAmPort(getAddress().getPort());
Credentials taskCredentials = new Credentials();
// Credentials can change across DAGs. Ideally construct only once per DAG.
- taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+ taskCredentials.addAll(getContext().getCredentials());
ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
if (credentialsBinary == null) {
- credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+ credentialsBinary = serializeCredentials(getContext().getCredentials());
credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
} else {
credentialsBinary = credentialsBinary.duplicate();