You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:15 UTC
[23/51] [abbrv] tez git commit: TEZ-2381. Fixes after rebase 04/28.
(sseth)
TEZ-2381. Fixes after rebase 04/28. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57745276
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57745276
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57745276
Branch: refs/heads/TEZ-2003
Commit: 57745276e1f8352c9a76693f737096bf6cff7b4c
Parents: 9d38581
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Apr 28 13:41:12 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 6 01:25:34 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 17 ++++----
.../app/TestTaskAttemptListenerImplTezDag.java | 44 +++++++++++++++-----
3 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index f6bc8e7..d42aaf8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -18,5 +18,6 @@ ALL CHANGES:
TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
TEZ-2347. Expose additional information in TaskCommunicatorContext.
TEZ-2361. Propagate dag completion to TaskCommunicator.
+ TEZ-2381. Fixes after rebase 04/28.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 7cdf292..cbaed99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,27 +17,21 @@
package org.apache.tez.dag.app;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.ListUtils;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
-import com.google.common.base.Preconditions;
-import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +55,6 @@ import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -150,7 +143,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Using Default Task Communicator");
- return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
+ return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
} else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Using Default Local Task Communicator");
return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
@@ -173,6 +166,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
}
}
+
+ @VisibleForTesting
+ protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) {
+ return new TezTaskCommunicatorImpl(context);
+ }
+
public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
http://git-wip-us.apache.org/repos/asf/tez/blob/57745276/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ab9fafe..2208220 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,11 +47,9 @@ import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TezException;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.dag.DAG;
@@ -82,7 +80,9 @@ import org.mockito.ArgumentCaptor;
// TODO TEZ-2003 Rename to TestTezTaskCommunicator
public class TestTaskAttemptListenerImplTezDag {
private ApplicationId appId;
+ private ApplicationAttemptId appAttemptId;
private AppContext appContext;
+ Credentials credentials;
AMContainerMap amContainerMap;
EventHandler eventHandler;
DAG dag;
@@ -98,11 +98,13 @@ public class TestTaskAttemptListenerImplTezDag {
@Before
public void setUp() {
appId = ApplicationId.newInstance(1000, 1);
+ appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
dag = mock(DAG.class);
TezDAGID dagID = TezDAGID.getInstance(appId, 1);
vertexID = TezVertexID.getInstance(dagID, 1);
taskID = TezTaskID.getInstance(vertexID, 1);
taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+ credentials = new Credentials();
amContainerMap = mock(AMContainerMap.class);
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
@@ -118,6 +120,8 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(clock).when(appContext).getClock();
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+ doReturn(credentials).when(appContext).getAppCredentials();
NodeId nodeId = NodeId.newInstance("localhost", 0);
AMContainer amContainer = mock(AMContainer.class);
Container container = mock(Container.class);
@@ -160,7 +164,7 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals(taskSpec, containerTask.getTaskSpec());
// Task unregistered. Should respond to heartbeats
- taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
+ taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
containerTask = tezUmbilical.getTask(containerContext2);
assertNull(containerTask);
@@ -190,7 +194,7 @@ public class TestTaskAttemptListenerImplTezDag {
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
ContainerId containerId1 = createContainerId(appId, 1);
- doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
+
ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
taskAttemptListener.registerRunningContainer(containerId1, 0);
containerTask = tezUmbilical.getTask(containerContext1);
@@ -320,7 +324,6 @@ public class TestTaskAttemptListenerImplTezDag {
int fromEventId, int maxEvents, int nextFromEventId,
List<TezEvent> sendEvents) throws IOException, TezException {
ContainerId containerId = createContainerId(appId, 1);
- long requestId = 0;
Vertex vertex = mock(Vertex.class);
doReturn(vertex).when(dag).getVertex(vertexID);
@@ -328,13 +331,13 @@ public class TestTaskAttemptListenerImplTezDag {
TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
- taskAttemptListener.registerRunningContainer(containerId);
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
+ taskAttemptListener.registerRunningContainer(containerId, 0);
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
+
+ TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
- TezHeartbeatRequest request = mock(TezHeartbeatRequest.class);
doReturn(containerId.toString()).when(request).getContainerIdentifier();
- doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
- doReturn(++requestId).when(request).getRequestId();
+ doReturn(taskAttemptID).when(request).getTaskAttemptId();
doReturn(events).when(request).getEvents();
doReturn(maxEvents).when(request).getMaxEvents();
doReturn(fromEventId).when(request).getStartIndex();
@@ -348,6 +351,25 @@ public class TestTaskAttemptListenerImplTezDag {
return ContainerId.newInstance(appAttemptId, containerIdx);
}
+ private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag {
+
+ public TaskAttemptListenerImplForTest(AppContext context,
+ TaskHeartbeatHandler thh,
+ ContainerHeartbeatHandler chh,
+ JobTokenSecretManager jobTokenSecretManager,
+ String[] taskCommunicatorClassIdentifiers,
+ boolean isPureLocalMode) {
+ super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers,
+ isPureLocalMode);
+ }
+
+ @Override
+ protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) {
+ return new TezTaskCommunicatorImplForTest(context);
+ }
+
+ }
+
private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
public TezTaskCommunicatorImplForTest(