You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:18:53 UTC
[03/50] [abbrv] tez git commit: TEZ-2433. Fixes after rebase 05/08.
(sseth)
TEZ-2433. Fixes after rebase 05/08. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4bb34211
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4bb34211
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4bb34211
Branch: refs/heads/TEZ-2003
Commit: 4bb342115a5af2c28d5ad99245e861e7f62c4298
Parents: df91ad5
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 8 18:43:16 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:54 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/dag/api/TaskHeartbeatResponse.java | 10 ++++++--
.../dag/app/TaskAttemptListenerImpTezDag.java | 27 ++++++++++----------
.../tez/dag/app/TezTaskCommunicatorImpl.java | 9 +++----
.../app/TestTaskAttemptListenerImplTezDag.java | 10 +++-----
.../library/common/shuffle/TestFetcher.java | 8 ++----
6 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9b2339f..ad167ab 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -22,5 +22,6 @@ ALL CHANGES:
TEZ-2388. Send dag identifier as part of the fetcher request string.
TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts & carry out necessary cleanups.
TEZ-2420. TaskRunner returning before executing the task.
+ TEZ-2433. Fixes after rebase 05/08
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index c82a743..b826e76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -22,11 +22,13 @@ import org.apache.tez.runtime.api.impl.TezEvent;
public class TaskHeartbeatResponse {
private final boolean shouldDie;
- private List<TezEvent> events;
+ private final int nextFromEventId;
+ private final List<TezEvent> events;
- public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events) {
+ public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) {
this.shouldDie = shouldDie;
this.events = events;
+ this.nextFromEventId = nextFromEventId;
}
public boolean isShouldDie() {
@@ -36,4 +38,8 @@ public class TaskHeartbeatResponse {
public List<TezEvent> getEvents() {
return events;
}
+
+ public int getNextFromEventId() {
+ return nextFromEventId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cbaed99..db78fa9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -78,7 +78,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
- private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null);
+ private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0);
private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -194,7 +194,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// So - avoiding synchronization.
pingContainerHeartbeatHandler(containerId);
- List<TezEvent> outEvents = null;
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
if (taskAttemptID != null) {
ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -216,12 +216,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+ // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+ // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+ // to VertexImpl to ensure the events ordering
+ // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+ // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
final EventType eventType = tezEvent.getEventType();
- if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
- eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
- context.getEventHandler()
- .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+ if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+ TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+ (TaskStatusUpdateEvent) tezEvent.getEvent());
+ context.getEventHandler().handle(taskAttemptEvent);
} else {
otherEvents.add(tezEvent);
}
@@ -232,14 +237,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
- outEvents = context
+ eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
- .getTask(taskAttemptID.getTaskID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
request.getMaxEvents());
}
- return new TaskHeartbeatResponse(false, outEvents);
+ return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
}
public void taskAlive(TezTaskAttemptID taskAttemptId) {
taskHeartbeatHandler.pinged(taskAttemptId);
@@ -435,9 +439,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ ", ContainerId not known for this attempt");
}
}
-
-
- public TaskCommunicator getTaskCommunicator() {
- return taskCommunicators[0];
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 6200a5b..accde2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -364,13 +364,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
request.getMaxEvents());
tResponse = taskCommunicatorContext.heartbeat(tRequest);
}
- TezHeartbeatResponse response;
- if (tResponse == null) {
- response = new TezHeartbeatResponse();
- } else {
- response = new TezHeartbeatResponse(tResponse.getEvents());
- }
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
response.setLastRequestId(requestId);
+ response.setEvents(tResponse.getEvents());
+ response.setNextFromEventId(tResponse.getNextFromEventId());
containerInfo.lastRequestId = requestId;
containerInfo.lastResponse = response;
return response;
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 2208220..34b9792 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
@@ -70,8 +71,6 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -260,10 +259,9 @@ public class TestTaskAttemptListenerImplTezDag {
public void testTaskHeartbeatResponse() throws Exception {
List<TezEvent> events = new ArrayList<TezEvent>();
List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
- TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+ TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
assertEquals(2, response.getNextFromEventId());
- assertEquals(1, response.getLastRequestId());
assertEquals(eventsToSend, response.getEvents());
}
@@ -320,7 +318,7 @@ public class TestTaskAttemptListenerImplTezDag {
return succeedToAllocate;
}
- private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+ private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
int fromEventId, int maxEvents, int nextFromEventId,
List<TezEvent> sendEvents) throws IOException, TezException {
ContainerId containerId = createContainerId(appId, 1);
@@ -335,7 +333,7 @@ public class TestTaskAttemptListenerImplTezDag {
taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
-
+ doReturn(containerId.toString()).when(request).getContainerIdentifier();
doReturn(containerId.toString()).when(request).getContainerIdentifier();
doReturn(taskAttemptID).when(request).getTaskAttemptId();
doReturn(events).when(request).getEvents();
http://git-wip-us.apache.org/repos/asf/tez/blob/4bb34211/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 85e3540..08efb3e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
@@ -43,11 +42,8 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
@@ -93,7 +89,7 @@ public class TestFetcher {
// when enabled and hostname does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+ ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false);
builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -109,7 +105,7 @@ public class TestFetcher {
// when enabled and port does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false);
+ ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false);
builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());