You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/11 02:06:30 UTC
git commit: TEZ-429. Basic event router for Logical Runtime task.
(hitesh)
Updated Branches:
refs/heads/TEZ-398 0bd0f7941 -> d9fc91d56
TEZ-429. Basic event router for Logical Runtime task. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d9fc91d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d9fc91d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d9fc91d5
Branch: refs/heads/TEZ-398
Commit: d9fc91d56587193f046c06d0984de7f56c4f7a61
Parents: 0bd0f79
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 10 17:05:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 10 17:05:43 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 54 ++++++++++++++------
.../LogicalIOProcessorRuntimeTask.java | 30 ++++++++++-
2 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9fc91d5/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3141910..462c30e 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -30,7 +30,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +74,9 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
+import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -80,6 +84,7 @@ import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -100,10 +105,10 @@ public class YarnTezDagChild {
private static String containerIdStr;
private static int eventCounter = 0;
private static int maxEventsToGet = 0;
- private static LinkedList<TezEvent> eventsToSend =
- new LinkedList<TezEvent>();
- private static ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed =
- new ConcurrentLinkedQueue<TezEvent>();
+ private static LinkedBlockingQueue<TezEvent> eventsToSend =
+ new LinkedBlockingQueue<TezEvent>();
+ private static LinkedBlockingQueue<TezEvent> eventsToBeProcessed =
+ new LinkedBlockingQueue<TezEvent>();
private static AtomicLong requestCounter = new AtomicLong(0);
private static TezTaskAttemptID currentTaskAttemptID;
private static long amPollInterval;
@@ -168,9 +173,8 @@ public class YarnTezDagChild {
}
}
try {
- TezEvent e = eventsToBeProcessed.poll();
+ TezEvent e = eventsToBeProcessed.poll(10, TimeUnit.MILLISECONDS);
if (e == null) {
- eventsToBeProcessed.wait();
continue;
}
// TODO TODONEWTEZ
@@ -182,7 +186,12 @@ public class YarnTezDagChild {
try {
taskLock.readLock().lock();
if (currentTask != null) {
- currentTask.handleEvent(e);
+ try {
+ currentTask.handleEvent(e);
+ } catch (Throwable t) {
+ LOG.warn("Failed to handle event", t);
+ // TODONEWTEZ
+ }
}
} finally {
taskLock.readLock().unlock();
@@ -213,7 +222,7 @@ public class YarnTezDagChild {
}
synchronized (eventLock) {
List<TezEvent> events = new ArrayList<TezEvent>();
- events.addAll(eventsToSend);
+ eventsToSend.drainTo(events);
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
currentTaskAttemptID, eventCounter, maxEventsToGet);
@@ -222,10 +231,8 @@ public class YarnTezDagChild {
// TODO TODONEWTEZ
throw new TezException("AM and Task out of sync");
}
- eventsToSend.clear();
eventCounter += response.getEvents().size();
eventsToBeProcessed.addAll(response.getEvents());
- eventsToBeProcessed.notifyAll();
}
}
@@ -314,7 +321,6 @@ public class YarnTezDagChild {
if (LOG.isDebugEnabled()) {
LOG.debug("PID, containerId: " + pid + ", " + containerIdentifier);
}
- TaskSpec taskSpec = null;
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
ContainerContext containerContext = new ContainerContext(
@@ -347,7 +353,7 @@ public class YarnTezDagChild {
return;
}
taskCount++;
- taskSpec = containerTask.getTaskSpec();
+ final TaskSpec taskSpec = containerTask.getTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("New container task context:"
+ taskSpec.toString());
@@ -386,9 +392,25 @@ public class YarnTezDagChild {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- currentTask.initialize();
- currentTask.run();
- currentTask.close();
+ EventMetaData sourceInfo = new EventMetaData(EventGenerator.SYSTEM,
+ taskSpec.getVertexName(), "", currentTaskAttemptID);
+ try {
+ currentTask.initialize();
+ currentTask.run();
+ currentTask.close();
+ // TODONEWTEZ check if task had a fatal error before
+ // sending completed event
+ TezEvent taskCompletedEvent =
+ new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
+ umbilical.taskAttemptCompleted(currentTaskAttemptID,
+ taskCompletedEvent);
+ } catch (Throwable t) {
+ TezEvent taskAttemptFailedEvent =
+ new TezEvent(new TaskAttemptFailedEvent(t.getMessage()),
+ sourceInfo);
+ umbilical.taskAttemptCompleted(currentTaskAttemptID,
+ taskAttemptFailedEvent);
+ }
try {
taskLock.writeLock().lock();
currentTask = null;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9fc91d5/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f077831..f0ac36e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.newruntime;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -290,6 +291,33 @@ public class LogicalIOProcessorRuntimeTask {
}
public void handleEvent(TezEvent e) {
- // TODO TODONEWTEZ
+ switch (e.getDestinationInfo().getEventGenerator()) {
+ case INPUT:
+ LogicalInput input = inputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (input != null) {
+ input.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case OUTPUT:
+ LogicalOutput output = outputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (output != null) {
+ output.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case PROCESSOR:
+ processor.handleEvents(Collections.singletonList(e.getEvent()));
+ break;
+ case SYSTEM:
+ LOG.warn("Trying to send a System event in a Task: " + e);
+ break;
+ }
}
}