You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:29 UTC

[11/50] [abbrv] tez git commit: TEZ-2381. Fixes after rebase 04/28. (sseth)

TEZ-2381. Fixes after rebase 04/28. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3a602428
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3a602428
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3a602428

Branch: refs/heads/TEZ-2003
Commit: 3a60242875720efa5c367e55a434264b098309f5
Parents: a5f872e
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Apr 28 13:41:12 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:43 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 17 ++++----
 .../app/TestTaskAttemptListenerImplTezDag.java  | 44 +++++++++++++++-----
 3 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3a602428/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index f6bc8e7..d42aaf8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -18,5 +18,6 @@ ALL CHANGES:
   TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
   TEZ-2347. Expose additional information in TaskCommunicatorContext.
   TEZ-2361. Propagate dag completion to TaskCommunicator.
+  TEZ-2381. Fixes after rebase 04/28.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/3a602428/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 7cdf292..cbaed99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,27 +17,21 @@
 
 package org.apache.tez.dag.app;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
-import com.google.common.base.Preconditions;
-import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +55,6 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -150,7 +143,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
     if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Using Default Task Communicator");
-      return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
+      return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
     } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Using Default Local Task Communicator");
       return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
@@ -173,6 +166,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
     }
   }
+
+  @VisibleForTesting
+  protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) {
+    return new TezTaskCommunicatorImpl(context);
+  }
+
   public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
       throws IOException, TezException {
     ContainerId containerId = ConverterUtils.toContainerId(request

http://git-wip-us.apache.org/repos/asf/tez/blob/3a602428/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ab9fafe..2208220 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,11 +47,9 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TezException;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.dag.DAG;
@@ -82,7 +80,9 @@ import org.mockito.ArgumentCaptor;
 // TODO TEZ-2003 Rename to TestTezTaskCommunicator
 public class TestTaskAttemptListenerImplTezDag {
   private ApplicationId appId;
+  private ApplicationAttemptId appAttemptId;
   private AppContext appContext;
+  Credentials credentials;
   AMContainerMap amContainerMap;
   EventHandler eventHandler;
   DAG dag;
@@ -98,11 +98,13 @@ public class TestTaskAttemptListenerImplTezDag {
   @Before
   public void setUp() {
     appId = ApplicationId.newInstance(1000, 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     dag = mock(DAG.class);
     TezDAGID dagID = TezDAGID.getInstance(appId, 1);
     vertexID = TezVertexID.getInstance(dagID, 1);
     taskID = TezTaskID.getInstance(vertexID, 1);
     taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+    credentials = new Credentials();
 
     amContainerMap = mock(AMContainerMap.class);
     Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
@@ -118,6 +120,8 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(clock).when(appContext).getClock();
     
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(credentials).when(appContext).getAppCredentials();
     NodeId nodeId = NodeId.newInstance("localhost", 0);
     AMContainer amContainer = mock(AMContainer.class);
     Container container = mock(Container.class);
@@ -160,7 +164,7 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
@@ -190,7 +194,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);
-    doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
+
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
     taskAttemptListener.registerRunningContainer(containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
@@ -320,7 +324,6 @@ public class TestTaskAttemptListenerImplTezDag {
       int fromEventId, int maxEvents, int nextFromEventId,
       List<TezEvent> sendEvents) throws IOException, TezException {
     ContainerId containerId = createContainerId(appId, 1);
-    long requestId = 0;
     Vertex vertex = mock(Vertex.class);
 
     doReturn(vertex).when(dag).getVertex(vertexID);
@@ -328,13 +331,13 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
     doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
 
-    taskAttemptListener.registerRunningContainer(containerId);
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
+    taskAttemptListener.registerRunningContainer(containerId, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
+
+    TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
 
-    TezHeartbeatRequest request = mock(TezHeartbeatRequest.class);
     doReturn(containerId.toString()).when(request).getContainerIdentifier();
-    doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
-    doReturn(++requestId).when(request).getRequestId();
+    doReturn(taskAttemptID).when(request).getTaskAttemptId();
     doReturn(events).when(request).getEvents();
     doReturn(maxEvents).when(request).getMaxEvents();
     doReturn(fromEventId).when(request).getStartIndex();
@@ -348,6 +351,25 @@ public class TestTaskAttemptListenerImplTezDag {
     return ContainerId.newInstance(appAttemptId, containerIdx);
   }
 
+  private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag {
+
+    public TaskAttemptListenerImplForTest(AppContext context,
+                                          TaskHeartbeatHandler thh,
+                                          ContainerHeartbeatHandler chh,
+                                          JobTokenSecretManager jobTokenSecretManager,
+                                          String[] taskCommunicatorClassIdentifiers,
+                                          boolean isPureLocalMode) {
+      super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers,
+          isPureLocalMode);
+    }
+
+    @Override
+    protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context) {
+      return new TezTaskCommunicatorImplForTest(context);
+    }
+
+  }
+
   private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
 
     public TezTaskCommunicatorImplForTest(