You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:28 UTC
[36/51] [abbrv] tez git commit: TEZ-2004. Define basic interface for
pluggable ContainerLaunchers. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 07dfcd6..25fd13e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
public abstract class TaskSchedulerService extends AbstractService{
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 6f897e1..d4cf317 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index 8ef2a83..cecb019 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
public class AMContainerEventCompleted extends AMContainerEvent {
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index e9e0f04..e63d86d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -28,8 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.Credentials;
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -1068,12 +1067,12 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
- sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId));
+ sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
}
protected void sendStopRequestToNM() {
sendEvent(new NMCommunicatorStopRequestEvent(containerId,
- container.getNodeId(), container.getContainerToken(), launcherId));
+ container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
}
protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 0f35bba..3c3c6a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,11 +34,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,23 +56,14 @@ import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.launcher.ContainerLauncher;
import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -89,6 +83,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class MockDAGAppMaster extends DAGAppMaster {
private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class);
+ ContainerLauncherContext containerLauncherContext;
MockContainerLauncher containerLauncher;
boolean initFailFlag;
boolean startFailFlag;
@@ -121,7 +116,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
// Upon, launch of a container is simulates the container asking for tasks
// Upon receiving a task it simulates completion of the tasks
// It can be used to preempt the container for a given task
- public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable {
+ public class MockContainerLauncher extends ContainerLauncher implements Runnable {
BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
Thread eventHandlingThread;
@@ -141,12 +136,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap();
-
- public MockContainerLauncher(AtomicBoolean goFlag) {
- super("MockContainerLauncher");
+
+ public MockContainerLauncher(AtomicBoolean goFlag,
+ ContainerLauncherContext containerLauncherContext) {
+ super("MockContainerLauncher", containerLauncherContext);
this.goFlag = goFlag;
}
+
public class ContainerData {
ContainerId cId;
TezTaskAttemptID taId;
@@ -211,20 +208,18 @@ public class MockDAGAppMaster extends DAGAppMaster {
executorService.shutdownNow();
}
}
-
+
+
@Override
- public void handle(NMCommunicatorEvent event) {
- switch (event.getType()) {
- case CONTAINER_LAUNCH_REQUEST:
- launch((NMCommunicatorLaunchRequestEvent) event);
- break;
- case CONTAINER_STOP_REQUEST:
- stop((NMCommunicatorStopRequestEvent)event);
- break;
- }
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+ launch(launchRequest);
}
-
-
+
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) {
+ stop(stopRequest);
+ }
+
void waitToGo() {
if (goFlag == null) {
return;
@@ -266,20 +261,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
tasksWithStatusUpdates.put(tId, numUpdates);
}
- void stop(NMCommunicatorStopRequestEvent event) {
+ void stop(ContainerStopRequest event) {
// remove from simulated container list
containers.remove(event.getContainerId());
- getContext().getEventHandler().handle(
- new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+ getContext().containerStopRequested(event.getContainerId());
}
- void launch(NMCommunicatorLaunchRequestEvent event) {
+ void launch(ContainerLaunchRequest event) {
// launch container by putting it in simulated container list
ContainerData cData = new ContainerData(event.getContainerId(),
event.getContainerLaunchContext());
containers.put(event.getContainerId(), cData);
containersToProcess.add(cData);
- getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));
+ getContext().containerLaunched(event.getContainerId());
}
public void waitTillContainersLaunched() throws InterruptedException {
@@ -289,7 +283,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
void incrementTime(long inc) {
- Clock clock = getContext().getClock();
+ Clock clock = MockDAGAppMaster.this.getContext().getClock();
if (clock instanceof MockClock) {
((MockClock) clock).incrementTime(inc);
}
@@ -493,7 +487,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1,
credentials, jobUserName);
- containerLauncher = new MockContainerLauncher(launcherGoFlag);
+ containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
+ containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
this.initFailFlag = initFailFlag;
this.startFailFlag = startFailFlag;
@@ -508,7 +503,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
String[] containerLaunchers,
boolean isLocal)
throws UnknownHostException {
- return new ContainerLauncherRouter(containerLauncher);
+ return new ContainerLauncherRouter(containerLauncher, getContext());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 7f0362d..df643e4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -51,8 +51,8 @@ import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index 934543f..8d776fb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 62edac9..e37ab4a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -53,12 +53,11 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -94,7 +93,6 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.internal.matchers.Null;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 0a642bb..b555c62 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index b8b4774..7bcb6d2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,8 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 9d22196..dbf5054 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -14,34 +14,30 @@
package org.apache.tez.dag.app.launcher;
+import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.TezTestServiceCommunicator;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.service.TezTestServiceConfConstants;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+// TODO TEZ-2003 look for all LOG.*(DBG and LOG.*(DEBUG messages
+
+public class TezTestServiceContainerLauncher extends ContainerLauncher {
// TODO Support interruptability of tasks which haven't yet been launched.
@@ -49,40 +45,32 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
static final Logger LOG = LoggerFactory.getLogger(TezTestServiceContainerLauncher.class);
- private final AppContext context;
private final String tokenIdentifier;
- private final TaskAttemptListener tal;
private final int servicePort;
private final TezTestServiceCommunicator communicator;
- private final Clock clock;
private final ApplicationAttemptId appAttemptId;
+ // private final TaskAttemptListener tal;
// Configuration passed in here to set up final parameters
- public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
- TaskAttemptListener tal) {
- super(TezTestServiceContainerLauncher.class.getName());
- this.clock = appContext.getClock();
- int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+ public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+ super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext);
+ int numThreads = getContext().getInitialConfiguration().getInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
- this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+ this.servicePort = getContext().getInitialConfiguration().getInt(
+ TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
Preconditions.checkArgument(servicePort > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
this.communicator = new TezTestServiceCommunicator(numThreads);
- this.context = appContext;
- this.tokenIdentifier = context.getApplicationID().toString();
- this.appAttemptId = appContext.getApplicationAttemptId();
- this.tal = tal;
- }
-
- @Override
- public void serviceInit(Configuration conf) {
- communicator.init(conf);
+ this.tokenIdentifier = getContext().getApplicationAttemptId().getApplicationId().toString();
+ this.appAttemptId = getContext().getApplicationAttemptId();
}
@Override
public void serviceStart() {
+ communicator.init(getContext().getInitialConfiguration());
communicator.start();
}
@@ -92,51 +80,56 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
}
@Override
- public void handle(NMCommunicatorEvent event) {
- switch (event.getType()) {
- case CONTAINER_LAUNCH_REQUEST:
- final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
- RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
- communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
- launchEvent.getNodeId().getPort(),
- new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
- @Override
- public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
- LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
- context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
- ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
- launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
- context.getHistoryHandler().handle(new DAGHistoryEvent(
- null, lEvt));
- }
-
- @Override
- public void indicateError(Throwable t) {
- LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
- sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
- }
- });
- break;
- case CONTAINER_STOP_REQUEST:
- LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
- // that the container is actually done (normally received from RM)
- // TODO Sending this out for an un-launched container is invalid
- context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
- AMContainerEventType.C_NM_STOP_SENT));
- break;
+ public void launchContainer(final ContainerLaunchRequest launchRequest) {
+ RunContainerRequestProto runRequest = null;
+ try {
+ runRequest = constructRunContainerRequest(launchRequest);
+ } catch (IOException e) {
+ getContext().containerLaunchFailed(launchRequest.getContainerId(),
+ "Failed to construct launch request, " + StringUtils.stringifyException(e));
+ return;
}
+ communicator.runContainer(runRequest, launchRequest.getNodeId().getHost(),
+ launchRequest.getNodeId().getPort(),
+ new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+ @Override
+ public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+ LOG.info(
+ "Container: " + launchRequest.getContainerId() + " launch succeeded on host: " +
+ launchRequest.getNodeId());
+ getContext().containerLaunched(launchRequest.getContainerId());
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ LOG.error(
+ "Failed to launch container: " + launchRequest.getContainerId() + " on host: " +
+ launchRequest.getNodeId(), t);
+ sendContainerLaunchFailedMsg(launchRequest.getContainerId(), t);
+ }
+ });
}
- private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) {
+ LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + stopRequest);
+ // that the container is actually done (normally received from RM)
+ // TODO Sending this out for an un-launched container is invalid
+ getContext().containerStopRequested(stopRequest.getContainerId());
+ }
+
+ private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
+ IOException {
RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
- InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress();
+ Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT));
+ InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
builder.setAppAttemptNumber(appAttemptId.getAttemptId());
builder.setApplicationIdString(appAttemptId.getApplicationId().toString());
builder.setTokenIdentifier(tokenIdentifier);
- builder.setContainerIdString(event.getContainer().getId().toString());
+ builder.setContainerIdString(launchRequest.getContainerId().toString());
builder.setCredentialsBinary(
- ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+ ByteString.copyFrom(launchRequest.getContainerLaunchContext().getTokens()));
// TODO Avoid reading this from the environment
builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
return builder.build();
@@ -144,6 +137,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
@SuppressWarnings("unchecked")
void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
- context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+ getContext().containerLaunchFailed(containerId, t == null ? "" : t.getMessage());
}
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index 977d0d3..d3743e1 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -14,53 +14,32 @@
package org.apache.tez.dag.app.launcher;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher {
static final Logger LOG = LoggerFactory.getLogger(TezTestServiceNoOpContainerLauncher.class);
- private final AppContext context;
- private final Clock clock;
- public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
- TaskAttemptListener tal) {
- super(TezTestServiceNoOpContainerLauncher.class.getName());
- this.context = appContext;
- this.clock = appContext.getClock();
+ public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+ super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext);
}
@Override
- public void handle(NMCommunicatorEvent event) {
- switch(event.getType()) {
- case CONTAINER_LAUNCH_REQUEST:
- final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
- LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
- context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
- ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
- launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
- context.getHistoryHandler().handle(new DAGHistoryEvent(
- null, lEvt));
- break;
- case CONTAINER_STOP_REQUEST:
- LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
- context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
- AMContainerEventType.C_NM_STOP_SENT));
- break;
- }
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+ LOG.info("No-op launch for container {} succeeded on host: {}", launchRequest.getContainerId(),
+ launchRequest.getNodeId());
+ getContext().containerLaunched(launchRequest.getContainerId());
+ }
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) {
+ LOG.info("DEBUG: Ignoring STOP_REQUEST {}", stopRequest);
+ getContext().containerStopRequested(stopRequest.getContainerId());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 073cb50..506e991 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.service.TezTestServiceConfConstants;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/tez/blob/d799d3b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 98673a6..444498e 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.TezTestServiceCommunicator;