You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:51 UTC
[4/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
deleted file mode 100644
index 5159aff..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezException;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-@SuppressWarnings("unchecked")
-// TODO TEZ-2003 (post) TEZ-2696 Rename to TestTezTaskCommunicator
-public class TestTaskAttemptListenerImplTezDag {
- private ApplicationId appId;
- private ApplicationAttemptId appAttemptId;
- private AppContext appContext;
- Credentials credentials;
- AMContainerMap amContainerMap;
- EventHandler eventHandler;
- DAG dag;
- TaskAttemptListenerImpTezDag taskAttemptListener;
- ContainerTask containerTask;
- AMContainerTask amContainerTask;
- TaskSpec taskSpec;
-
- TezVertexID vertexID;
- TezTaskID taskID;
- TezTaskAttemptID taskAttemptID;
-
- @Before
- public void setUp() {
- appId = ApplicationId.newInstance(1000, 1);
- appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- dag = mock(DAG.class);
- TezDAGID dagID = TezDAGID.getInstance(appId, 1);
- vertexID = TezVertexID.getInstance(dagID, 1);
- taskID = TezTaskID.getInstance(vertexID, 1);
- taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
- credentials = new Credentials();
-
- amContainerMap = mock(AMContainerMap.class);
- Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
-
- eventHandler = mock(EventHandler.class);
-
- MockClock clock = new MockClock();
-
- appContext = mock(AppContext.class);
- doReturn(eventHandler).when(appContext).getEventHandler();
- doReturn(dag).when(appContext).getCurrentDAG();
- doReturn(appAcls).when(appContext).getApplicationACLs();
- doReturn(amContainerMap).when(appContext).getAllContainers();
- doReturn(clock).when(appContext).getClock();
-
- doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
- doReturn(credentials).when(appContext).getAppCredentials();
- NodeId nodeId = NodeId.newInstance("localhost", 0);
- AMContainer amContainer = mock(AMContainer.class);
- Container container = mock(Container.class);
- doReturn(nodeId).when(container).getNodeId();
- doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
- doReturn(container).when(amContainer).getContainer();
-
- Configuration conf = new TezConfiguration();
- UserPayload defaultPayload;
- try {
- defaultPayload = TezUtils.createUserPayloadFromConf(conf);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
- taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class),
- Lists.newArrayList(
- new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(defaultPayload)));
-
- taskSpec = mock(TaskSpec.class);
- doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
- amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
- containerTask = null;
- }
-
- @Test(timeout = 5000)
- public void testGetTask() throws IOException {
-
- TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
- TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
-
- ContainerId containerId1 = createContainerId(appId, 1);
- ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
- containerTask = tezUmbilical.getTask(containerContext1);
- assertTrue(containerTask.shouldDie());
-
- ContainerId containerId2 = createContainerId(appId, 2);
- ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
- taskAttemptListener.registerRunningContainer(containerId2, 0);
- containerTask = tezUmbilical.getTask(containerContext2);
- assertNull(containerTask);
-
- // Valid task registered
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
- containerTask = tezUmbilical.getTask(containerContext2);
- assertFalse(containerTask.shouldDie());
- assertEquals(taskSpec, containerTask.getTaskSpec());
-
- // Task unregistered. Should respond to heartbeats
- taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
- containerTask = tezUmbilical.getTask(containerContext2);
- assertNull(containerTask);
-
- // Container unregistered. Should send a shouldDie = true
- taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
- containerTask = tezUmbilical.getTask(containerContext2);
- assertTrue(containerTask.shouldDie());
-
- ContainerId containerId3 = createContainerId(appId, 3);
- ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
- taskAttemptListener.registerRunningContainer(containerId3, 0);
-
- // Register task to container3, followed by unregistering container 3 all together
- TaskSpec taskSpec2 = mock(TaskSpec.class);
- TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
- doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
- AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
- taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
- taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
- containerTask = tezUmbilical.getTask(containerContext3);
- assertTrue(containerTask.shouldDie());
- }
-
- @Test(timeout = 5000)
- public void testGetTaskMultiplePulls() throws IOException {
- TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
- TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
-
- ContainerId containerId1 = createContainerId(appId, 1);
-
- ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
- taskAttemptListener.registerRunningContainer(containerId1, 0);
- containerTask = tezUmbilical.getTask(containerContext1);
- assertNull(containerTask);
-
- // Register task
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
- containerTask = tezUmbilical.getTask(containerContext1);
- assertFalse(containerTask.shouldDie());
- assertEquals(taskSpec, containerTask.getTaskSpec());
-
- // Try pulling again - simulates re-use pull
- containerTask = tezUmbilical.getTask(containerContext1);
- assertNull(containerTask);
- }
-
- @Test (timeout = 5000)
- public void testTaskEventRouting() throws Exception {
- List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
- new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
- new TezEvent(new TaskAttemptCompletedEvent(), null)
- );
-
- generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
-
- ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
- verify(eventHandler, times(2)).handle(arg.capture());
- final List<Event> argAllValues = arg.getAllValues();
-
- final Event statusUpdateEvent = argAllValues.get(0);
- assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
- statusUpdateEvent.getType());
-
- final Event vertexEvent = argAllValues.get(1);
- final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
- assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
- vertexEvent.getType());
- assertEquals(EventType.DATA_MOVEMENT_EVENT,
- vertexRouteEvent.getEvents().get(0).getEventType());
- assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
- vertexRouteEvent.getEvents().get(1).getEventType());
-
- }
-
- @Test (timeout = 5000)
- public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
- List<TezEvent> events = Arrays.asList(
- new TezEvent(new TaskAttemptCompletedEvent(), null)
- );
- generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
-
- ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
- verify(eventHandler, times(1)).handle(arg.capture());
- final List<Event> argAllValues = arg.getAllValues();
-
- final Event event = argAllValues.get(0);
- assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
- event.getType());
- }
-
- @Test (timeout = 5000)
- public void testTaskHeartbeatResponse() throws Exception {
- List<TezEvent> events = new ArrayList<TezEvent>();
- List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
- TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
-
- assertEquals(2, response.getNextFromEventId());
- assertEquals(eventsToSend, response.getEvents());
- }
-
- //try 10 times to allocate random port, fail it if no one is succeed.
- @Test (timeout = 5000)
- public void testPortRange() {
- boolean succeedToAllocate = false;
- Random rand = new Random();
- for (int i = 0; i < 10; ++i) {
- int nextPort = 1024 + rand.nextInt(65535 - 1024);
- if (testPortRange(nextPort)) {
- succeedToAllocate = true;
- break;
- }
- }
- if (!succeedToAllocate) {
- fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
- }
- }
-
- // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
- @Test (timeout= 5000)
- public void testPortRange_NotSpecified() throws IOException {
- Configuration conf = new Configuration();
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
- "fakeIdentifier"));
- Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
- new JobTokenSecretManager());
- sessionToken.setService(identifier.getJobId());
- TokenCache.setSessionToken(sessionToken, credentials);
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
- taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
- new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(userPayload)));
- // no exception happen, should started properly
- taskAttemptListener.init(conf);
- taskAttemptListener.start();
- }
-
- private boolean testPortRange(int port) {
- boolean succeedToAllocate = true;
- try {
- Configuration conf = new Configuration();
-
- JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
- "fakeIdentifier"));
- Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
- new JobTokenSecretManager());
- sessionToken.setService(identifier.getJobId());
- TokenCache.setSessionToken(sessionToken, credentials);
-
- conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
- taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists
- .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(userPayload)));
- taskAttemptListener.init(conf);
- taskAttemptListener.start();
- int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
- assertEquals(port, resultedPort);
- } catch (Exception e) {
- succeedToAllocate = false;
- } finally {
- if (taskAttemptListener != null) {
- try {
- taskAttemptListener.close();
- } catch (IOException e) {
- e.printStackTrace();
- fail("fail to stop TaskAttemptListener");
- }
- }
- }
- return succeedToAllocate;
- }
-
- private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
- int fromEventId, int maxEvents, int nextFromEventId,
- List<TezEvent> sendEvents) throws IOException, TezException {
- ContainerId containerId = createContainerId(appId, 1);
- Vertex vertex = mock(Vertex.class);
-
- doReturn(vertex).when(dag).getVertex(vertexID);
- doReturn("test_vertex").when(vertex).getName();
- TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
- doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
-
- taskAttemptListener.registerRunningContainer(containerId, 0);
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
-
- TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
- doReturn(containerId.toString()).when(request).getContainerIdentifier();
- doReturn(containerId.toString()).when(request).getContainerIdentifier();
- doReturn(taskAttemptID).when(request).getTaskAttemptId();
- doReturn(events).when(request).getEvents();
- doReturn(maxEvents).when(request).getMaxEvents();
- doReturn(fromEventId).when(request).getStartIndex();
-
- return taskAttemptListener.heartbeat(request);
- }
-
-
- private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
- return ContainerId.newInstance(appAttemptId, containerIdx);
- }
-
- private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag {
-
- public TaskAttemptListenerImplForTest(AppContext context,
- TaskHeartbeatHandler thh,
- ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> taskCommDescriptors) {
- super(context, thh, chh, taskCommDescriptors);
- }
-
- @Override
- TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- return new TezTaskCommunicatorImplForTest(taskCommunicatorContext);
- }
-
- }
-
- private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
-
- public TezTaskCommunicatorImplForTest(
- TaskCommunicatorContext taskCommunicatorContext) {
- super(taskCommunicatorContext);
- }
-
- @Override
- protected void startRpcServer() {
- }
-
- @Override
- protected void stopRpcServer() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
deleted file mode 100644
index 74468f2..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to.
-public class TestTaskAttemptListenerImplTezDag2 {
-
- @Test(timeout = 5000)
- public void testTaskAttemptFailedKilled() throws IOException {
- ApplicationId appId = ApplicationId.newInstance(1000, 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- Credentials credentials = new Credentials();
- AppContext appContext = mock(AppContext.class);
- EventHandler eventHandler = mock(EventHandler.class);
- DAG dag = mock(DAG.class);
- AMContainerMap amContainerMap = mock(AMContainerMap.class);
- Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
- doReturn(eventHandler).when(appContext).getEventHandler();
- doReturn(dag).when(appContext).getCurrentDAG();
- doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
- doReturn(credentials).when(appContext).getAppCredentials();
- doReturn(appAcls).when(appContext).getApplicationACLs();
- doReturn(amContainerMap).when(appContext).getAllContainers();
- NodeId nodeId = NodeId.newInstance("localhost", 0);
- AMContainer amContainer = mock(AMContainer.class);
- Container container = mock(Container.class);
- doReturn(nodeId).when(container).getNodeId();
- doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
- doReturn(container).when(amContainer).getContainer();
-
- Configuration conf = new TezConfiguration();
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
- TaskAttemptListenerImpTezDag taskAttemptListener =
- new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
- mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
- TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
-
- TaskSpec taskSpec1 = mock(TaskSpec.class);
- TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
- doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
- AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
-
- TaskSpec taskSpec2 = mock(TaskSpec.class);
- TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
- doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
- AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
-
- ContainerId containerId1 = createContainerId(appId, 1);
- taskAttemptListener.registerRunningContainer(containerId1, 0);
- taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
- ContainerId containerId2 = createContainerId(appId, 2);
- taskAttemptListener.registerRunningContainer(containerId2, 0);
- taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
-
-
- taskAttemptListener
- .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
- taskAttemptListener
- .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
-
- ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
- verify(eventHandler, times(2)).handle(argumentCaptor.capture());
- assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
- assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
- TaskAttemptEventAttemptFailed failedEvent =
- (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
- TaskAttemptEventAttemptKilled killedEvent =
- (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
-
- assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
- assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
- failedEvent.getTerminationCause());
-
- assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
- assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
- // TODO TEZ-2003. Verify unregistration from the registered list
- }
-
- private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
- return containerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 1545eb4..5222a2d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -34,7 +34,7 @@ public class TestTaskCommunicatorContextImpl {
@Test(timeout = 5000)
public void testIsKnownContainer() {
AppContext appContext = mock(AppContext.class);
- TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class);
+ TaskCommunicatorManager tal = mock(TaskCommunicatorManager.class);
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock(
ContainerSignatureMatcher.class), appContext);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index 4f68fab..be7adde 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -220,7 +220,7 @@ public class TestTaskCommunicatorManager {
}
- static class TaskCommManagerForMultipleCommTest extends TaskAttemptListenerImpTezDag {
+ static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager {
// All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
// and regular variables will not be initialized at this point.
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
new file mode 100644
index 0000000..e8ce429
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -0,0 +1,425 @@
+/*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings("unchecked")
+public class TestTaskCommunicatorManager1 {
+ private ApplicationId appId;
+ private ApplicationAttemptId appAttemptId;
+ private AppContext appContext;
+ Credentials credentials;
+ AMContainerMap amContainerMap;
+ EventHandler eventHandler;
+ DAG dag;
+ TaskCommunicatorManager taskAttemptListener;
+ ContainerTask containerTask;
+ AMContainerTask amContainerTask;
+ TaskSpec taskSpec;
+
+ TezVertexID vertexID;
+ TezTaskID taskID;
+ TezTaskAttemptID taskAttemptID;
+
+ @Before
+ public void setUp() {
+ appId = ApplicationId.newInstance(1000, 1);
+ appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ dag = mock(DAG.class);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ vertexID = TezVertexID.getInstance(dagID, 1);
+ taskID = TezTaskID.getInstance(vertexID, 1);
+ taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+ credentials = new Credentials();
+
+ amContainerMap = mock(AMContainerMap.class);
+ Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+
+ eventHandler = mock(EventHandler.class);
+
+ MockClock clock = new MockClock();
+
+ appContext = mock(AppContext.class);
+ doReturn(eventHandler).when(appContext).getEventHandler();
+ doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(appAcls).when(appContext).getApplicationACLs();
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(clock).when(appContext).getClock();
+
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(credentials).when(appContext).getAppCredentials();
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ AMContainer amContainer = mock(AMContainer.class);
+ Container container = mock(Container.class);
+ doReturn(nodeId).when(container).getNodeId();
+ doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+ doReturn(container).when(amContainer).getContainer();
+
+ Configuration conf = new TezConfiguration();
+ UserPayload defaultPayload;
+ try {
+ defaultPayload = TezUtils.createUserPayloadFromConf(conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ taskAttemptListener = new TaskCommunicatorManagerInterfaceImplForTest(appContext,
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class),
+ Lists.newArrayList(
+ new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(defaultPayload)));
+
+ taskSpec = mock(TaskSpec.class);
+ doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
+ amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
+ containerTask = null;
+ }
+
+ @Test(timeout = 5000)
+ public void testGetTask() throws IOException {
+
+ TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+ TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
+ ContainerId containerId1 = createContainerId(appId, 1);
+ ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
+ containerTask = tezUmbilical.getTask(containerContext1);
+ assertTrue(containerTask.shouldDie());
+
+ ContainerId containerId2 = createContainerId(appId, 2);
+ ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
+ taskAttemptListener.registerRunningContainer(containerId2, 0);
+ containerTask = tezUmbilical.getTask(containerContext2);
+ assertNull(containerTask);
+
+ // Valid task registered
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
+ containerTask = tezUmbilical.getTask(containerContext2);
+ assertFalse(containerTask.shouldDie());
+ assertEquals(taskSpec, containerTask.getTaskSpec());
+
+ // Task unregistered. Should respond to heartbeats
+ taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
+ containerTask = tezUmbilical.getTask(containerContext2);
+ assertNull(containerTask);
+
+ // Container unregistered. Should send a shouldDie = true
+ taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
+ containerTask = tezUmbilical.getTask(containerContext2);
+ assertTrue(containerTask.shouldDie());
+
+ ContainerId containerId3 = createContainerId(appId, 3);
+ ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
+ taskAttemptListener.registerRunningContainer(containerId3, 0);
+
+ // Register task to container3, followed by unregistering container 3 all together
+ TaskSpec taskSpec2 = mock(TaskSpec.class);
+ TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+ doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+ AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
+ taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
+ containerTask = tezUmbilical.getTask(containerContext3);
+ assertTrue(containerTask.shouldDie());
+ }
+
+ @Test(timeout = 5000)
+ public void testGetTaskMultiplePulls() throws IOException {
+ TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+ TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
+ ContainerId containerId1 = createContainerId(appId, 1);
+
+ ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
+ taskAttemptListener.registerRunningContainer(containerId1, 0);
+ containerTask = tezUmbilical.getTask(containerContext1);
+ assertNull(containerTask);
+
+ // Register task
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
+ containerTask = tezUmbilical.getTask(containerContext1);
+ assertFalse(containerTask.shouldDie());
+ assertEquals(taskSpec, containerTask.getTaskSpec());
+
+ // Try pulling again - simulates re-use pull
+ containerTask = tezUmbilical.getTask(containerContext1);
+ assertNull(containerTask);
+ }
+
+ @Test (timeout = 5000)
+ public void testTaskEventRouting() throws Exception {
+ List<TezEvent> events = Arrays.asList(
+ new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+ new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
+ new TezEvent(new TaskAttemptCompletedEvent(), null)
+ );
+
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(arg.capture());
+ final List<Event> argAllValues = arg.getAllValues();
+
+ final Event statusUpdateEvent = argAllValues.get(0);
+ assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+ statusUpdateEvent.getType());
+
+ final Event vertexEvent = argAllValues.get(1);
+ final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+ assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+ vertexEvent.getType());
+ assertEquals(EventType.DATA_MOVEMENT_EVENT,
+ vertexRouteEvent.getEvents().get(0).getEventType());
+ assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+ vertexRouteEvent.getEvents().get(1).getEventType());
+
+ }
+
+ @Test (timeout = 5000)
+ public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
+ List<TezEvent> events = Arrays.asList(
+ new TezEvent(new TaskAttemptCompletedEvent(), null)
+ );
+ generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(1)).handle(arg.capture());
+ final List<Event> argAllValues = arg.getAllValues();
+
+ final Event event = argAllValues.get(0);
+ assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+ event.getType());
+ }
+
+ @Test (timeout = 5000)
+ public void testTaskHeartbeatResponse() throws Exception {
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
+ TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+
+ assertEquals(2, response.getNextFromEventId());
+ assertEquals(eventsToSend, response.getEvents());
+ }
+
+ //try 10 times to allocate random port, fail it if no one is succeed.
+ @Test (timeout = 5000)
+ public void testPortRange() {
+ boolean succeedToAllocate = false;
+ Random rand = new Random();
+ for (int i = 0; i < 10; ++i) {
+ int nextPort = 1024 + rand.nextInt(65535 - 1024);
+ if (testPortRange(nextPort)) {
+ succeedToAllocate = true;
+ break;
+ }
+ }
+ if (!succeedToAllocate) {
+ fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
+ }
+ }
+
+ // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
+ @Test (timeout= 5000)
+ public void testPortRange_NotSpecified() throws IOException {
+ Configuration conf = new Configuration();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+ "fakeIdentifier"));
+ Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+ new JobTokenSecretManager());
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, credentials);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+ taskAttemptListener = new TaskCommunicatorManager(appContext,
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
+ new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(userPayload)));
+ // no exception happen, should started properly
+ taskAttemptListener.init(conf);
+ taskAttemptListener.start();
+ }
+
+ private boolean testPortRange(int port) {
+ boolean succeedToAllocate = true;
+ try {
+ Configuration conf = new Configuration();
+
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+ "fakeIdentifier"));
+ Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+ new JobTokenSecretManager());
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, credentials);
+
+ conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+ taskAttemptListener = new TaskCommunicatorManager(appContext,
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists
+ .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(userPayload)));
+ taskAttemptListener.init(conf);
+ taskAttemptListener.start();
+ int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
+ assertEquals(port, resultedPort);
+ } catch (Exception e) {
+ succeedToAllocate = false;
+ } finally {
+ if (taskAttemptListener != null) {
+ try {
+ taskAttemptListener.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("fail to stop TaskAttemptListener");
+ }
+ }
+ }
+ return succeedToAllocate;
+ }
+
+ private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+ int fromEventId, int maxEvents, int nextFromEventId,
+ List<TezEvent> sendEvents) throws IOException, TezException {
+ ContainerId containerId = createContainerId(appId, 1);
+ Vertex vertex = mock(Vertex.class);
+
+ doReturn(vertex).when(dag).getVertex(vertexID);
+ doReturn("test_vertex").when(vertex).getName();
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
+ doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
+
+ taskAttemptListener.registerRunningContainer(containerId, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
+
+ TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
+ doReturn(containerId.toString()).when(request).getContainerIdentifier();
+ doReturn(containerId.toString()).when(request).getContainerIdentifier();
+ doReturn(taskAttemptID).when(request).getTaskAttemptId();
+ doReturn(events).when(request).getEvents();
+ doReturn(maxEvents).when(request).getMaxEvents();
+ doReturn(fromEventId).when(request).getStartIndex();
+
+ return taskAttemptListener.heartbeat(request);
+ }
+
+
+ private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ return ContainerId.newInstance(appAttemptId, containerIdx);
+ }
+
+ private static class TaskCommunicatorManagerInterfaceImplForTest extends TaskCommunicatorManager {
+
+ public TaskCommunicatorManagerInterfaceImplForTest(AppContext context,
+ TaskHeartbeatHandler thh,
+ ContainerHeartbeatHandler chh,
+ List<NamedEntityDescriptor> taskCommDescriptors) {
+ super(context, thh, chh, taskCommDescriptors);
+ }
+
+ @Override
+ TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+ return new TezTaskCommunicatorImplForTest(taskCommunicatorContext);
+ }
+
+ }
+
+ private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
+
+ public TezTaskCommunicatorImplForTest(
+ TaskCommunicatorContext taskCommunicatorContext) {
+ super(taskCommunicatorContext);
+ }
+
+ @Override
+ protected void startRpcServer() {
+ }
+
+ @Override
+ protected void stopRpcServer() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
new file mode 100644
index 0000000..d75b0e5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestTaskCommunicatorManager2 {
+
+ @Test(timeout = 5000)
+ public void testTaskAttemptFailedKilled() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ Credentials credentials = new Credentials();
+ AppContext appContext = mock(AppContext.class);
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = mock(DAG.class);
+ AMContainerMap amContainerMap = mock(AMContainerMap.class);
+ Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+ doReturn(eventHandler).when(appContext).getEventHandler();
+ doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(credentials).when(appContext).getAppCredentials();
+ doReturn(appAcls).when(appContext).getApplicationACLs();
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+ AMContainer amContainer = mock(AMContainer.class);
+ Container container = mock(Container.class);
+ doReturn(nodeId).when(container).getNodeId();
+ doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+ doReturn(container).when(amContainer).getContainer();
+
+ Configuration conf = new TezConfiguration();
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+ TaskCommunicatorManager taskAttemptListener =
+ new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
+ mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
+ TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
+
+ TaskSpec taskSpec1 = mock(TaskSpec.class);
+ TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
+ doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
+ AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
+
+ TaskSpec taskSpec2 = mock(TaskSpec.class);
+ TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+ doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+ AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
+
+ ContainerId containerId1 = createContainerId(appId, 1);
+ taskAttemptListener.registerRunningContainer(containerId1, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
+ ContainerId containerId2 = createContainerId(appId, 2);
+ taskAttemptListener.registerRunningContainer(containerId2, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
+
+
+ taskAttemptListener
+ .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+ taskAttemptListener
+ .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
+
+ ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+ assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+ assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
+ TaskAttemptEventAttemptFailed failedEvent =
+ (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+ TaskAttemptEventAttemptKilled killedEvent =
+ (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
+
+ assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
+ assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
+ failedEvent.getTerminationCause());
+
+ assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
+ assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
+ // TODO TEZ-2003. Verify unregistration from the registered list
+ }
+
+ private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
+ return containerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 83421a2..f0b89c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -70,7 +70,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
@@ -142,7 +142,7 @@ public class TestCommit {
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
- private TaskAttemptListener taskAttemptListener;
+ private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private TaskHeartbeatHandler thh;
private Clock clock = new SystemClock();
private DAGFinishEventHandler dagFinishEventHandler;
@@ -317,7 +317,7 @@ public class TestCommit {
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
doReturn(aclManager).when(appContext).getAMACLManager();
dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(),
- taskAttemptListener, fsTokens, clock, "user", thh, appContext);
+ taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, appContext);
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index e268a99..ac4f61b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -88,7 +88,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
@@ -165,7 +165,7 @@ public class TestDAGImpl {
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
- private TaskAttemptListener taskAttemptListener;
+ private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private TaskHeartbeatHandler thh;
private Clock clock = new SystemClock();
private DAGFinishEventHandler dagFinishEventHandler;
@@ -784,7 +784,7 @@ public class TestDAGImpl {
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
doReturn(aclManager).when(appContext).getAMACLManager();
dag = new DAGImpl(dagId, conf, dagPlan,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, appContext);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
@@ -795,7 +795,7 @@ public class TestDAGImpl {
mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
mrrDagPlan = createTestMRRDAGPlan();
mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh,
mrrAppContext);
mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag);
@@ -811,7 +811,7 @@ public class TestDAGImpl {
groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
groupDagPlan = createGroupDAGPlan();
groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh,
groupAppContext);
groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag);
@@ -881,7 +881,7 @@ public class TestDAGImpl {
dagWithCustomEdgeAppContext = mock(AppContext.class);
doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge);
doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 792fa63..409c506 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -37,7 +37,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
@@ -91,7 +91,7 @@ public class TestDAGRecovery {
DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
- mock(TaskAttemptListener.class), new Credentials(),
+ mock(TaskCommunicatorManagerInterface.class), new Credentials(),
new SystemClock(), user, mock(TaskHeartbeatHandler.class),
mockAppContext);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 04bb2df..13c9202 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -64,7 +64,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -145,7 +145,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
- mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+ mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
@@ -180,12 +180,12 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
- mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+ mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
- mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+ mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
@@ -243,7 +243,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(
TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
- mock(TaskAttemptListener.class), new Configuration(),
+ mock(TaskCommunicatorManagerInterface.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mock(AppContext.class), false, Resource.newInstance(1024,
1), createFakeContainerContext(), false);
@@ -285,7 +285,7 @@ public class TestTaskAttempt {
TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -334,7 +334,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -354,7 +354,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -434,7 +434,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -454,7 +454,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -498,7 +498,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -518,7 +518,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -589,7 +589,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -610,7 +610,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -719,7 +719,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -740,7 +740,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -810,7 +810,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -830,7 +830,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -904,7 +904,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -924,7 +924,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -1006,7 +1006,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1026,7 +1026,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -1105,7 +1105,7 @@ public class TestTaskAttempt {
MockEventHandler mockEh = new MockEventHandler();
MockEventHandler eventHandler = spy(mockEh);
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1125,7 +1125,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -1249,7 +1249,7 @@ public class TestTaskAttempt {
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
MockEventHandler eventHandler = spy(new MockEventHandler());
- TaskAttemptListener taListener = createMockTaskAttemptListener();
+ TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1268,7 +1268,7 @@ public class TestTaskAttempt {
AppContext appCtx = mock(AppContext.class);
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appCtx);
containers.addContainerIfNew(container, 0, 0, 0);
@@ -1324,7 +1324,7 @@ public class TestTaskAttempt {
public int taskAttemptStartedEventLogged = 0;
public int taskAttemptFinishedEventLogged = 0;
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
- EventHandler eventHandler, TaskAttemptListener tal,
+ EventHandler eventHandler, TaskCommunicatorManagerInterface tal,
Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
@@ -1378,8 +1378,8 @@ public class TestTaskAttempt {
new Credentials(), new HashMap<String, String>(), "");
}
- private TaskAttemptListener createMockTaskAttemptListener() {
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ private TaskCommunicatorManagerInterface createMockTaskAttemptListener() {
+ TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class);
TaskCommunicator taskComm = mock(TaskCommunicator.class);
doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
doReturn(taskComm).when(taListener).getTaskCommunicator(0);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 4a797e0..6bbfc3d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -146,7 +146,7 @@ public class TestTaskAttemptRecovery {
TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
ta =
new TaskAttemptImpl(taskId, 0, mockEventHandler,
- mock(TaskAttemptListener.class), new Configuration(),
+ mock(TaskCommunicatorManagerInterface.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
mock(ContainerContext.class), false, mockTask);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 807f277..24c9664 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -50,7 +50,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
@@ -85,7 +85,7 @@ public class TestTaskImpl {
private final int partition = 1;
private Configuration conf;
- private TaskAttemptListener taskAttemptListener;
+ private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private TaskHeartbeatHandler taskHeartbeatHandler;
private Credentials credentials;
private Clock clock;
@@ -122,7 +122,7 @@ public class TestTaskImpl {
@Before
public void setup() {
conf = new Configuration();
- taskAttemptListener = mock(TaskAttemptListener.class);
+ taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class);
taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
credentials = new Credentials();
clock = new SystemClock();
@@ -151,7 +151,7 @@ public class TestTaskImpl {
eventHandler = new TestEventHandler();
mockTask = new MockTaskImpl(vertexId, partition,
- eventHandler, conf, taskAttemptListener, clock,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
mockTaskSpec = mock(TaskSpec.class);
@@ -698,11 +698,11 @@ public class TestTaskImpl {
public MockTaskImpl(TezVertexID vertexId, int partition,
EventHandler eventHandler, Configuration conf,
- TaskAttemptListener taskAttemptListener, Clock clock,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
Resource resource,
ContainerContext containerContext, Vertex vertex) {
- super(vertexId, partition, eventHandler, conf, taskAttemptListener,
+ super(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface,
clock, thh, appContext, leafVertex, resource,
containerContext, mock(StateChangeNotifier.class), vertex);
this.vertex = vertex;
@@ -711,7 +711,7 @@ public class TestTaskImpl {
@Override
protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
- attemptNumber, eventHandler, taskAttemptListener,
+ attemptNumber, eventHandler, taskCommunicatorManagerInterface,
conf, clock, taskHeartbeatHandler, appContext,
true, taskResource, containerContext, schedCausalTA);
taskAttempts.add(attempt);
@@ -757,7 +757,7 @@ public class TestTaskImpl {
private TaskAttemptState state = TaskAttemptState.NEW;
public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
- EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
+ EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 1d22e06..eca8274 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -46,7 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -189,7 +189,7 @@ public class TestTaskRecovery {
when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
task =
new TaskImpl(vertexId, 0, dispatcher.getEventHandler(),
- new Configuration(), mock(TaskAttemptListener.class),
+ new Configuration(), mock(TaskCommunicatorManagerInterface.class),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mockAppContext, false, Resource.newInstance(1, 1),
mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);