You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/14 01:03:04 UTC
git commit: TEZ-441. Move tez event handling thread into the
LogicalTask from YarnTezDagChild. (hitesh)
Updated Branches:
refs/heads/TEZ-398 057239907 -> 62f65223a
TEZ-441. Move tez event handling thread into the LogicalTask from YarnTezDagChild. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/62f65223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/62f65223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/62f65223
Branch: refs/heads/TEZ-398
Commit: 62f65223a7e46dff3937060967b5fdc26eec1475
Parents: 0572399
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 13 16:02:46 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 13 16:02:46 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 117 +++---------
.../LogicalIOProcessorRuntimeTask.java | 176 +++++++++++++------
.../tez/engine/newruntime/RuntimeTask.java | 33 +++-
3 files changed, 181 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 10170eb..2ebdb18 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -103,12 +102,9 @@ public class YarnTezDagChild {
private static AtomicBoolean stopped = new AtomicBoolean(false);
private static String containerIdStr;
- private static int eventCounter = 0;
private static int maxEventsToGet = 0;
private static LinkedBlockingQueue<TezEvent> eventsToSend =
new LinkedBlockingQueue<TezEvent>();
- private static LinkedBlockingQueue<TezEvent> eventsToBeProcessed =
- new LinkedBlockingQueue<TezEvent>();
private static AtomicLong requestCounter = new AtomicLong(0);
private static TezTaskAttemptID currentTaskAttemptID;
private static long amPollInterval;
@@ -149,86 +145,21 @@ public class YarnTezDagChild {
return heartbeatThread;
}
- private static Thread startRouterThread() {
- Thread eventRouterThread = new Thread(new Runnable() {
- public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- while (true) {
- try {
- taskLock.readLock().lock();
- if (currentTask != null) {
- break;
- }
- } finally {
- taskLock.readLock().unlock();
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Event Router thread interrupted. Returning.");
- }
- return;
- }
- }
- try {
- TezEvent e = eventsToBeProcessed.poll(10, TimeUnit.MILLISECONDS);
- if (e == null) {
- continue;
- }
- // TODO TODONEWTEZ
- try {
- taskLock.readLock().lock();
- if (currentTask != null) {
- try {
- currentTask.handleEvent(e);
- } catch (Throwable t) {
- LOG.warn("Failed to handle event", t);
- currentTask.setFatalError(t, "Failed to handle event");
- TezEvent taskAttemptFailedEvent = new TezEvent(
- new TaskAttemptFailedEvent(
- StringUtils.stringifyException(t)),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- "", "", currentTaskAttemptID));
- try {
- umbilical.taskAttemptFailed(currentTaskAttemptID,
- taskAttemptFailedEvent);
- } catch (IOException ioe) {
- // TODO Auto-generated catch block
- ioe.printStackTrace();
- // TODO NEWTEZ System exit?
- }
- }
- }
- } finally {
- taskLock.readLock().unlock();
- }
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Event Router thread interrupted. Returning.");
- }
- return;
- }
- }
- }
- });
- eventRouterThread.setName("Tez Container Event Router Thread ["
- + containerIdStr + "]");
- eventRouterThread.start();
- return eventRouterThread;
- }
-
private static void heartbeat() throws TezException, IOException {
TezEvent updateEvent = null;
+ int eventCounter = 0;
+ int eventsRange = 0;
+ TezTaskAttemptID taskAttemptID = null;
try {
taskLock.readLock().lock();
- if (currentTask == null) {
- return;
- } else {
+ if (currentTask != null) {
+ taskAttemptID = currentTaskAttemptID;
+ eventCounter = currentTask.getEventCounter();
+ eventsRange = maxEventsToGet;
updateEvent = new TezEvent(new TaskStatusUpdateEvent(
currentTask.getCounters(), currentTask.getProgress()),
- new EventMetaData(EventProducerConsumerType.SYSTEM,
- "", "", currentTaskAttemptID));
+ new EventMetaData(EventProducerConsumerType.SYSTEM,
+ "", "", taskAttemptID));
}
} finally {
taskLock.readLock().unlock();
@@ -240,14 +171,20 @@ public class YarnTezDagChild {
eventsToSend.drainTo(events);
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
- currentTaskAttemptID, eventCounter, maxEventsToGet);
+ taskAttemptID, eventCounter, eventsRange);
TezHeartbeatResponse response = umbilical.heartbeat(request);
if (response.getLastRequestId() != reqId) {
// TODO TODONEWTEZ
throw new TezException("AM and Task out of sync");
}
- eventCounter += response.getEvents().size();
- eventsToBeProcessed.addAll(response.getEvents());
+ try {
+ taskLock.readLock().lock();
+ if (currentTask != null) {
+ currentTask.handleEvents(response.getEvents());
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
}
public static void main(String[] args) throws Throwable {
@@ -320,7 +257,6 @@ public class YarnTezDagChild {
});
Thread heartbeatThread = startHeartbeatThread();
- Thread eventRouterThread = startRouterThread();
TezUmbilical tezUmbilical = new TezUmbilical() {
@Override
@@ -364,7 +300,7 @@ public class YarnTezDagChild {
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
int taskCount = 0;
- TezVertexID currentVertexId = null;
+ TezVertexID lastVertexId = null;
EventMetaData currentSourceInfo = null;
try {
while (true) {
@@ -401,15 +337,15 @@ public class YarnTezDagChild {
TezVertexID newVertexId =
currentTaskAttemptID.getTaskID().getVertexID();
- if (currentVertexId != null) {
- if (!currentVertexId.equals(newVertexId)) {
+ if (lastVertexId != null) {
+ if (!lastVertexId.equals(newVertexId)) {
objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
}
- if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
+ if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
objectRegistry.clearCache(ObjectLifeCycle.DAG);
}
}
- currentVertexId = newVertexId;
+ lastVertexId = newVertexId;
updateLoggers(currentTaskAttemptID);
currentTask = createLogicalTask(
taskSpec, defaultConf, tezUmbilical, jobToken);
@@ -458,7 +394,11 @@ public class YarnTezDagChild {
}
try {
taskLock.writeLock().lock();
+ if (currentTask != null) {
+ currentTask.cleanup();
+ }
currentTask = null;
+ currentTaskAttemptID = null;
} finally {
taskLock.writeLock().unlock();
}
@@ -487,8 +427,7 @@ public class YarnTezDagChild {
}
} finally {
stopped.set(true);
- eventRouterThread.join();
- heartbeatThread.join();
+ heartbeatThread.interrupt();
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f69ea2d..20392bf 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -21,17 +21,21 @@ package org.apache.tez.engine.newruntime;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -66,10 +70,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private static final Log LOG = LogFactory
.getLog(LogicalIOProcessorRuntimeTask.class);
- private final TaskSpec taskSpec;
- private final Configuration tezConf;
- private final TezUmbilical tezUmbilical;
-
private final List<InputSpec> inputSpecs;
private final List<LogicalInput> inputs;
@@ -84,15 +84,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private Map<String, LogicalInput> inputMap;
private Map<String, LogicalOutput> outputMap;
+ private AtomicBoolean stopped;
+ private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+ private Thread eventRouterThread = null;
+
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
Configuration tezConf, TezUmbilical tezUmbilical,
Token<JobTokenIdentifier> jobToken) throws IOException {
// TODO Remove jobToken from here post TEZ-421
+ super(taskSpec, tezConf, tezUmbilical);
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ taskSpec);
- this.taskSpec = taskSpec;
- this.tezConf = tezConf;
- this.tezUmbilical = tezUmbilical;
this.inputSpecs = taskSpec.getInputs();
this.inputs = createInputs(inputSpecs);
this.outputSpecs = taskSpec.getOutputs();
@@ -102,6 +104,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
ShuffleUtils.convertJobTokenToBytes(jobToken));
+ this.stopped = new AtomicBoolean(false);
+ this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
this.state = State.NEW;
}
@@ -130,6 +134,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
// Initialize processor.
initializeLogicalIOProcessor();
+ startRouterThread();
}
public void run() throws Exception {
@@ -143,29 +148,36 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
public void close() throws Exception {
- Preconditions.checkState(this.state == State.RUNNING,
- "Can only run while in RUNNING state. Current: " + this.state);
- this.state = State.CLOSED;
-
- // Close the Inputs.
- for (int i = 0; i < inputs.size(); i++) {
- String srcVertexName = inputSpecs.get(i).getSourceVertexName();
- List<Event> closeInputEvents = inputs.get(i).close();
- sendTaskGeneratedEvents(closeInputEvents,
- EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
- srcVertexName, taskSpec.getTaskAttemptID());
- }
+ try {
+ Preconditions.checkState(this.state == State.RUNNING,
+ "Can only run while in RUNNING state. Current: " + this.state);
+ this.state = State.CLOSED;
+
+ // Close the Inputs.
+ for (int i = 0; i < inputs.size(); i++) {
+ String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+ List<Event> closeInputEvents = inputs.get(i).close();
+ sendTaskGeneratedEvents(closeInputEvents,
+ EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+ srcVertexName, taskSpec.getTaskAttemptID());
+ }
- // Close the Processor.
- processor.close();
+ // Close the Processor.
+ processor.close();
- // Close the Outputs.
- for (int i = 0; i < outputs.size(); i++) {
- String destVertexName = outputSpecs.get(i).getDestinationVertexName();
- List<Event> closeOutputEvents = outputs.get(i).close();
- sendTaskGeneratedEvents(closeOutputEvents,
- EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
- destVertexName, taskSpec.getTaskAttemptID());
+ // Close the Outputs.
+ for (int i = 0; i < outputs.size(); i++) {
+ String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+ List<Event> closeOutputEvents = outputs.get(i).close();
+ sendTaskGeneratedEvents(closeOutputEvents,
+ EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+ destVertexName, taskSpec.getTaskAttemptID());
+ }
+ } finally {
+ stopped.set(true);
+ if (eventRouterThread != null) {
+ eventRouterThread.interrupt();
+ }
}
}
@@ -241,7 +253,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
+ " is not a sub-type of LogicalInput."
+ " Only LogicalInput sub-types supported by LogicalIOProcessor.");
}
-
}
return inputs;
}
@@ -290,34 +301,89 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
}
- public void handleEvent(TezEvent e) {
- switch (e.getDestinationInfo().getEventGenerator()) {
- case INPUT:
- LogicalInput input = inputMap.get(
- e.getDestinationInfo().getEdgeVertexName());
- if (input != null) {
- input.handleEvents(Collections.singletonList(e.getEvent()));
- } else {
- throw new TezUncheckedException("Unhandled event for invalid target: "
- + e);
+ private boolean handleEvent(TezEvent e) {
+ try {
+ switch (e.getDestinationInfo().getEventGenerator()) {
+ case INPUT:
+ LogicalInput input = inputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (input != null) {
+ input.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case OUTPUT:
+ LogicalOutput output = outputMap.get(
+ e.getDestinationInfo().getEdgeVertexName());
+ if (output != null) {
+ output.handleEvents(Collections.singletonList(e.getEvent()));
+ } else {
+ throw new TezUncheckedException("Unhandled event for invalid target: "
+ + e);
+ }
+ break;
+ case PROCESSOR:
+ processor.handleEvents(Collections.singletonList(e.getEvent()));
+ break;
+ case SYSTEM:
+ LOG.warn("Trying to send a System event in a Task: " + e);
+ break;
}
- break;
- case OUTPUT:
- LogicalOutput output = outputMap.get(
- e.getDestinationInfo().getEdgeVertexName());
- if (output != null) {
- output.handleEvents(Collections.singletonList(e.getEvent()));
- } else {
- throw new TezUncheckedException("Unhandled event for invalid target: "
- + e);
+ } catch (Throwable t) {
+ LOG.warn("Failed to handle event", t);
+ setFatalError(t, "Failed to handle event");
+ EventMetaData sourceInfo = new EventMetaData(
+ e.getDestinationInfo().getEventGenerator(),
+ taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
+ getTaskAttemptID());
+ tezUmbilical.signalFatalError(getTaskAttemptID(),
+ StringUtils.stringifyException(t), sourceInfo);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void handleEvents(Collection<TezEvent> events) {
+ eventsToBeProcessed.addAll(events);
+ eventCounter.addAndGet(events.size());
+ }
+
+ private void startRouterThread() {
+ eventRouterThread = new Thread(new Runnable() {
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TezEvent e = eventsToBeProcessed.take();
+ if (e == null) {
+ continue;
+ }
+ // TODO TODONEWTEZ
+ if (!handleEvent(e)) {
+ LOG.warn("Stopping Event Router thread as failed to handle"
+ + " event: " + e);
+ break;
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Event Router thread interrupted. Returning.");
+ }
+ }
+ }
}
- break;
- case PROCESSOR:
- processor.handleEvents(Collections.singletonList(e.getEvent()));
- break;
- case SYSTEM:
- LOG.warn("Trying to send a System event in a Task: " + e);
- break;
+ });
+
+ eventRouterThread.setName("TezTaskEventRouter["
+ + taskSpec.getTaskAttemptID().toString() + "]");
+ eventRouterThread.start();
+ }
+
+ public synchronized void cleanup() {
+ stopped.set(true);
+ if (eventRouterThread != null) {
+ eventRouterThread.interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index 045d1c6..92840ae 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -18,9 +18,16 @@
package org.apache.tez.engine.newruntime;
+import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
public abstract class RuntimeTask {
@@ -28,7 +35,21 @@ public abstract class RuntimeTask {
protected Throwable fatalError = null;
protected String fatalErrorMessage = null;
protected float progress;
- protected final TezCounters tezCounters = new TezCounters();
+ protected final TezCounters tezCounters;
+ protected final TaskSpec taskSpec;
+ protected final Configuration tezConf;
+ protected final TezUmbilical tezUmbilical;
+ protected final AtomicInteger eventCounter;
+
+ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+ TezUmbilical tezUmbilical) {
+ this.taskSpec = taskSpec;
+ this.tezConf = tezConf;
+ this.tezUmbilical = tezUmbilical;
+ this.tezCounters = new TezCounters();
+ this.eventCounter = new AtomicInteger(0);
+ this.progress = 0.0f;
+ }
protected enum State {
NEW, INITED, RUNNING, CLOSED;
@@ -58,4 +79,14 @@ public abstract class RuntimeTask {
return this.tezCounters;
}
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskSpec.getTaskAttemptID();
+ }
+
+ public abstract void handleEvents(Collection<TezEvent> events);
+
+ public int getEventCounter() {
+ return eventCounter.get();
+ }
+
}