You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:28 UTC

tez git commit: TEZ-2433. Fixes after rebase 05/08. (sseth)

Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 d03e330fa -> 620e095c5


TEZ-2433. Fixes after rebase 05/08. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/620e095c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/620e095c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/620e095c

Branch: refs/heads/TEZ-2003
Commit: 620e095c584331b634bb4cbf1973db3bc893db21
Parents: d03e330
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 8 18:43:16 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 18:43:16 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../tez/dag/api/TaskHeartbeatResponse.java      | 10 ++++++--
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 26 ++++++++++----------
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  9 +++----
 .../app/TestTaskAttemptListenerImplTezDag.java  | 11 +++------
 .../library/common/shuffle/TestFetcher.java     |  8 ++----
 6 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9b2339f..ad167ab 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -22,5 +22,6 @@ ALL CHANGES:
   TEZ-2388. Send dag identifier as part of the fetcher request string.
   TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
   TEZ-2420. TaskRunner returning before executing the task.
+  TEZ-2433. Fixes after rebase 05/08
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index c82a743..b826e76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -22,11 +22,13 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 public class TaskHeartbeatResponse {
 
   private final boolean shouldDie;
-  private List<TezEvent> events;
+  private final int nextFromEventId;
+  private final List<TezEvent> events;
 
-  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events) {
+  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) {
     this.shouldDie = shouldDie;
     this.events = events;
+    this.nextFromEventId = nextFromEventId;
   }
 
   public boolean isShouldDie() {
@@ -36,4 +38,8 @@ public class TaskHeartbeatResponse {
   public List<TezEvent> getEvents() {
     return events;
   }
+
+  public int getNextFromEventId() {
+    return nextFromEventId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d30919b..1182d54 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
 
-  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null);
+  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0);
 
   private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
       new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // So - avoiding synchronization.
 
     pingContainerHeartbeatHandler(containerId);
-    List<TezEvent> outEvents = null;
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
     TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
     if (taskAttemptID != null) {
       ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -217,12 +217,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
 
       List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+      // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+      // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+      // to VertexImpl to ensure the events ordering
+      //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+      //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
         final EventType eventType = tezEvent.getEventType();
-        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
-            eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
-          context.getEventHandler()
-              .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+          TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+              (TaskStatusUpdateEvent) tezEvent.getEvent());
+          context.getEventHandler().handle(taskAttemptEvent);
         } else {
           otherEvents.add(tezEvent);
         }
@@ -233,14 +238,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
       }
       taskHeartbeatHandler.pinged(taskAttemptID);
-      outEvents = context
+      eventInfo = context
           .getCurrentDAG()
           .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTask(taskAttemptID.getTaskID())
           .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
               request.getMaxEvents());
     }
-    return new TaskHeartbeatResponse(false, outEvents);
+    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
   }
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);
@@ -436,8 +440,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + ", ContainerId not known for this attempt");
     }
   }
-
-  public TaskCommunicator getTaskCommunicator() {
-    return taskCommunicators[0];
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 825a4d2..34c8822 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -362,13 +362,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
             request.getMaxEvents());
         tResponse = taskCommunicatorContext.heartbeat(tRequest);
       }
-      TezHeartbeatResponse response;
-      if (tResponse == null) {
-        response = new TezHeartbeatResponse();
-      } else {
-        response = new TezHeartbeatResponse(tResponse.getEvents());
-      }
+      TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
+      response.setEvents(tResponse.getEvents());
+      response.setNextFromEventId(tResponse.getNextFromEventId());
       containerInfo.lastRequestId = requestId;
       containerInfo.lastResponse = response;
       return response;

http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index ae9ebc0..5924fc1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -44,6 +44,7 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
@@ -61,14 +62,11 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -252,14 +250,13 @@ public class TestTaskAttemptListenerImplTezDag {
   public void testTaskHeartbeatResponse() throws Exception {
     List<TezEvent> events = new ArrayList<TezEvent>();
     List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
-    TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+    TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
     
     assertEquals(2, response.getNextFromEventId());
-    assertEquals(1, response.getLastRequestId());
     assertEquals(eventsToSend, response.getEvents());
   }
 
-  private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+  private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
       int fromEventId, int maxEvents, int nextFromEventId,
       List<TezEvent> sendEvents) throws IOException, TezException {
     ContainerId containerId = createContainerId(appId, 1);
@@ -274,7 +271,7 @@ public class TestTaskAttemptListenerImplTezDag {
     taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
 
     TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
-
+    doReturn(containerId.toString()).when(request).getContainerIdentifier();
     doReturn(containerId.toString()).when(request).getContainerIdentifier();
     doReturn(taskAttemptID).when(request).getTaskAttemptId();
     doReturn(events).when(request).getEvents();

http://git-wip-us.apache.org/repos/asf/tez/blob/620e095c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index d2b0bde..1a7d628 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -39,11 +38,8 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -87,7 +83,7 @@ public class TestFetcher {
 
     // when enabled and hostname does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
         PORT);
     builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
@@ -103,7 +99,7 @@ public class TestFetcher {
 
     // when enabled and port does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
     builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());