You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/16 23:46:24 UTC
git commit: TEZ-452. Handle the case where
heartbeatResponse.eventList is empty (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 6e66c5e7b -> 2d5302a4b
TEZ-452. Handle the case where heartbeatResponse.eventList is empty
(part of TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2d5302a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2d5302a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2d5302a4
Branch: refs/heads/TEZ-398
Commit: 2d5302a4bf8f96ccae2587fc15c0f6f78d01bfc6
Parents: 6e66c5e
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 14:46:00 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 14:46:00 2013 -0700
----------------------------------------------------------------------
.../dag/app/TaskAttemptListenerImpTezDag.java | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +++
.../LogicalIOProcessorRuntimeTask.java | 23 +++++++++++++-------
3 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cc99af2..3db7a1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -594,6 +594,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
synchronized (attemptInfo) {
if(attemptInfo.lastRequestId == requestId) {
+ LOG.warn("Old sequenceId received: " + requestId + ", Re-sending last response to client");
return attemptInfo.lastReponse;
}
if(attemptInfo.lastRequestId+1 < requestId) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1186caa..6e6e109 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1460,6 +1460,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
srcEdge.sendTezEventToSourceTasks(tezEvent);
}
break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // TODO NEWTEZ FIXME: Handle this event
+ break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2d5302a4/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index eb3b28f..a73a261 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -215,9 +215,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private TezInputContext createInputContext(InputSpec inputSpec) {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
- tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
- taskSpec.getTaskAttemptID(), tezCounters,
- inputSpec.getInputDescriptor().getUserPayload(), this,
+ tezUmbilical, taskSpec.getVertexName(),
+ inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters,
+ inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
+ .getProcessorDescriptor().getUserPayload() : inputSpec
+ .getInputDescriptor().getUserPayload(), this,
serviceConsumerMetadata);
return inputContext;
}
@@ -225,9 +228,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private TezOutputContext createOutputContext(OutputSpec outputSpec) {
TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
tezUmbilical, taskSpec.getVertexName(),
- outputSpec.getDestinationVertexName(),
- taskSpec.getTaskAttemptID(), tezCounters,
- outputSpec.getOutputDescriptor().getUserPayload(), this,
+ outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters,
+ outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
+ .getProcessorDescriptor().getUserPayload() : outputSpec
+ .getOutputDescriptor().getUserPayload(), this,
serviceConsumerMetadata);
return outputContext;
}
@@ -347,8 +352,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
public synchronized void handleEvents(Collection<TezEvent> events) {
- eventsToBeProcessed.addAll(events);
- eventCounter.addAndGet(events.size());
+ if (!(events == null || events.size() == 0)) {
+ eventsToBeProcessed.addAll(events);
+ eventCounter.addAndGet(events.size());
+ }
}
private void startRouterThread() {