You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/16 23:46:24 UTC

git commit: TEZ-452. Handle the case where heartbeatResponse.eventList is empty (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 6e66c5e7b -> 2d5302a4b


TEZ-452. Handle the case where heartbeatResponse.eventList is empty
(part of TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2d5302a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2d5302a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2d5302a4

Branch: refs/heads/TEZ-398
Commit: 2d5302a4bf8f96ccae2587fc15c0f6f78d01bfc6
Parents: 6e66c5e
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 14:46:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 14:46:00 2013 -0700

----------------------------------------------------------------------
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 +++
 .../LogicalIOProcessorRuntimeTask.java          | 23 +++++++++++++-------
 3 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cc99af2..3db7a1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -594,6 +594,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     synchronized (attemptInfo) {      
       if(attemptInfo.lastRequestId == requestId) {
+        LOG.warn("Old sequenceId received: " + requestId + ", Re-sending last response to client");
         return attemptInfo.lastReponse;
       }
       if(attemptInfo.lastRequestId+1 < requestId) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1186caa..6e6e109 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1460,6 +1460,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             srcEdge.sendTezEventToSourceTasks(tezEvent);
           }
           break;
+        case TASK_STATUS_UPDATE_EVENT:
+          // TODO NEWTEZ FIXME: Handle this event
+          break;
         default:
           throw new TezUncheckedException("Unhandled tez event type: "
               + tezEvent.getEventType());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index eb3b28f..a73a261 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -215,9 +215,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private TezInputContext createInputContext(InputSpec inputSpec) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
-        tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
-        taskSpec.getTaskAttemptID(), tezCounters,
-        inputSpec.getInputDescriptor().getUserPayload(), this,
+        tezUmbilical, taskSpec.getVertexName(),
+        inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters,
+        inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
+            .getProcessorDescriptor().getUserPayload() : inputSpec
+            .getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata);
     return inputContext;
   }
@@ -225,9 +228,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private TezOutputContext createOutputContext(OutputSpec outputSpec) {
     TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
         tezUmbilical, taskSpec.getVertexName(),
-        outputSpec.getDestinationVertexName(),
-        taskSpec.getTaskAttemptID(), tezCounters,
-        outputSpec.getOutputDescriptor().getUserPayload(), this,
+        outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters,
+        outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
+            .getProcessorDescriptor().getUserPayload() : outputSpec
+            .getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata);
     return outputContext;
   }
@@ -347,8 +352,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   @Override
   public synchronized void handleEvents(Collection<TezEvent> events) {
-    eventsToBeProcessed.addAll(events);
-    eventCounter.addAndGet(events.size());
+    if (!(events == null || events.size() == 0)) {
+      eventsToBeProcessed.addAll(events);
+      eventCounter.addAndGet(events.size());
+    }
   }
 
   private void startRouterThread() {