You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:44 UTC

[26/50] [abbrv] tez git commit: TEZ-2508. rebase 06/01. (sseth)

TEZ-2508. rebase 06/01. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dcd767e6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dcd767e6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dcd767e6

Branch: refs/heads/TEZ-2003
Commit: dcd767e61047c6cbab845a518f1af050c63bb5b1
Parents: 2a3e2b3
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jun 1 16:37:26 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:44 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                         | 1 +
 .../java/org/apache/tez/dag/api/TaskHeartbeatRequest.java    | 7 +++++++
 .../java/org/apache/tez/dag/api/TaskHeartbeatResponse.java   | 8 +++++++-
 .../org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java | 8 ++++----
 .../java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java | 3 ++-
 .../apache/tez/runtime/LogicalIOProcessorRuntimeTask.java    | 3 ---
 6 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 42c2e1e..55002fe 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -30,5 +30,6 @@ ALL CHANGES:
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
   TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
   TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
+  TEZ-2508. rebase 06/01
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
index f6bc8f0..b5ff991 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
@@ -29,15 +29,18 @@ public class TaskHeartbeatRequest {
   private final TezTaskAttemptID taskAttemptId;
   private final List<TezEvent> events;
   private final int startIndex;
+  private final int preRoutedStartIndex;
   private final int maxEvents;
 
 
   public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
+                              int preRoutedStartIndex,
                               int maxEvents) {
     this.containerIdentifier = containerIdentifier;
     this.taskAttemptId = taskAttemptId;
     this.events = events;
     this.startIndex = startIndex;
+    this.preRoutedStartIndex = preRoutedStartIndex;
     this.maxEvents = maxEvents;
   }
 
@@ -57,6 +60,10 @@ public class TaskHeartbeatRequest {
     return startIndex;
   }
 
+  public int getPreRoutedStartIndex() {
+    return preRoutedStartIndex;
+  }
+
   public int getMaxEvents() {
     return maxEvents;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index b826e76..7f063c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -23,12 +23,14 @@ public class TaskHeartbeatResponse {
 
   private final boolean shouldDie;
   private final int nextFromEventId;
+  private final int nextPreRoutedEventId;
   private final List<TezEvent> events;
 
-  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) {
+  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
     this.shouldDie = shouldDie;
     this.events = events;
     this.nextFromEventId = nextFromEventId;
+    this.nextPreRoutedEventId = nextPreRoutedEventId;
   }
 
   public boolean isShouldDie() {
@@ -42,4 +44,8 @@ public class TaskHeartbeatResponse {
   public int getNextFromEventId() {
     return nextFromEventId;
   }
+
+  public int getNextPreRoutedEventId() {
+    return nextPreRoutedEventId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1c61a0d..e2d44e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
 
-  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0);
+  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
 
   private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
       new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // So - avoiding synchronization.
 
     pingContainerHeartbeatHandler(containerId);
-    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
     TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
     if (taskAttemptID != null) {
       ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -241,10 +241,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       eventInfo = context
           .getCurrentDAG()
           .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
               request.getMaxEvents());
     }
-    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
+    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
   }
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 3774eb4..83322f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -363,13 +363,14 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
         }
         TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(),
             request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(),
-            request.getMaxEvents());
+            request.getPreRoutedStartIndex(), request.getMaxEvents());
         tResponse = taskCommunicatorContext.heartbeat(tRequest);
       }
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
       response.setEvents(tResponse.getEvents());
       response.setNextFromEventId(tResponse.getNextFromEventId());
+      response.setNextPreRoutedEventId(tResponse.getNextPreRoutedEventId());
       containerInfo.lastRequestId = requestId;
       containerInfo.lastResponse = response;
       return response;

http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 449fa0f..c79da5d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -172,9 +172,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
     this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
 
-    this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
-    this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
-
     this.runInputMap = new LinkedHashMap<String, LogicalInput>();
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();