You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:44 UTC
[26/50] [abbrv] tez git commit: TEZ-2508. rebase 06/01. (sseth)
TEZ-2508. rebase 06/01. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dcd767e6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dcd767e6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dcd767e6
Branch: refs/heads/TEZ-2003
Commit: dcd767e61047c6cbab845a518f1af050c63bb5b1
Parents: 2a3e2b3
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jun 1 16:37:26 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:44 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../java/org/apache/tez/dag/api/TaskHeartbeatRequest.java | 7 +++++++
.../java/org/apache/tez/dag/api/TaskHeartbeatResponse.java | 8 +++++++-
.../org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java | 8 ++++----
.../java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java | 3 ++-
.../apache/tez/runtime/LogicalIOProcessorRuntimeTask.java | 3 ---
6 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 42c2e1e..55002fe 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -30,5 +30,6 @@ ALL CHANGES:
TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
+ TEZ-2508. rebase 06/01
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
index f6bc8f0..b5ff991 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
@@ -29,15 +29,18 @@ public class TaskHeartbeatRequest {
private final TezTaskAttemptID taskAttemptId;
private final List<TezEvent> events;
private final int startIndex;
+ private final int preRoutedStartIndex;
private final int maxEvents;
public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
+ int preRoutedStartIndex,
int maxEvents) {
this.containerIdentifier = containerIdentifier;
this.taskAttemptId = taskAttemptId;
this.events = events;
this.startIndex = startIndex;
+ this.preRoutedStartIndex = preRoutedStartIndex;
this.maxEvents = maxEvents;
}
@@ -57,6 +60,10 @@ public class TaskHeartbeatRequest {
return startIndex;
}
+ public int getPreRoutedStartIndex() {
+ return preRoutedStartIndex;
+ }
+
public int getMaxEvents() {
return maxEvents;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index b826e76..7f063c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -23,12 +23,14 @@ public class TaskHeartbeatResponse {
private final boolean shouldDie;
private final int nextFromEventId;
+ private final int nextPreRoutedEventId;
private final List<TezEvent> events;
- public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) {
+ public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
this.shouldDie = shouldDie;
this.events = events;
this.nextFromEventId = nextFromEventId;
+ this.nextPreRoutedEventId = nextPreRoutedEventId;
}
public boolean isShouldDie() {
@@ -42,4 +44,8 @@ public class TaskHeartbeatResponse {
public int getNextFromEventId() {
return nextFromEventId;
}
+
+ public int getNextPreRoutedEventId() {
+ return nextPreRoutedEventId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1c61a0d..e2d44e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
- private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0);
+ private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// So - avoiding synchronization.
pingContainerHeartbeatHandler(containerId);
- TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
if (taskAttemptID != null) {
ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -241,10 +241,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
- .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
+ .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
- return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
+ return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
}
public void taskAlive(TezTaskAttemptID taskAttemptId) {
taskHeartbeatHandler.pinged(taskAttemptId);
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 3774eb4..83322f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -363,13 +363,14 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(),
request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(),
- request.getMaxEvents());
+ request.getPreRoutedStartIndex(), request.getMaxEvents());
tResponse = taskCommunicatorContext.heartbeat(tRequest);
}
TezHeartbeatResponse response = new TezHeartbeatResponse();
response.setLastRequestId(requestId);
response.setEvents(tResponse.getEvents());
response.setNextFromEventId(tResponse.getNextFromEventId());
+ response.setNextPreRoutedEventId(tResponse.getNextPreRoutedEventId());
containerInfo.lastRequestId = requestId;
containerInfo.lastResponse = response;
return response;
http://git-wip-us.apache.org/repos/asf/tez/blob/dcd767e6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 449fa0f..c79da5d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -172,9 +172,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs);
this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs);
- this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>();
- this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>();
-
this.runInputMap = new LinkedHashMap<String, LogicalInput>();
this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();