You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/14 01:03:04 UTC

git commit: TEZ-441. Move tez event handling thread into the LogicalTask from YarnTezDagChild. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 057239907 -> 62f65223a


TEZ-441. Move tez event handling thread into the LogicalTask from YarnTezDagChild. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/62f65223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/62f65223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/62f65223

Branch: refs/heads/TEZ-398
Commit: 62f65223a7e46dff3937060967b5fdc26eec1475
Parents: 0572399
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 13 16:02:46 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 13 16:02:46 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 117 +++---------
 .../LogicalIOProcessorRuntimeTask.java          | 176 +++++++++++++------
 .../tez/engine/newruntime/RuntimeTask.java      |  33 +++-
 3 files changed, 181 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 10170eb..2ebdb18 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -103,12 +102,9 @@ public class YarnTezDagChild {
   private static AtomicBoolean stopped = new AtomicBoolean(false);
 
   private static String containerIdStr;
-  private static int eventCounter = 0;
   private static int maxEventsToGet = 0;
   private static LinkedBlockingQueue<TezEvent> eventsToSend =
       new LinkedBlockingQueue<TezEvent>();
-  private static LinkedBlockingQueue<TezEvent> eventsToBeProcessed =
-      new LinkedBlockingQueue<TezEvent>();
   private static AtomicLong requestCounter = new AtomicLong(0);
   private static TezTaskAttemptID currentTaskAttemptID;
   private static long amPollInterval;
@@ -149,86 +145,21 @@ public class YarnTezDagChild {
     return heartbeatThread;
   }
 
-  private static Thread startRouterThread() {
-    Thread eventRouterThread = new Thread(new Runnable() {
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          while (true) {
-            try {
-              taskLock.readLock().lock();
-              if (currentTask != null) {
-                break;
-              }
-            } finally {
-              taskLock.readLock().unlock();
-            }
-            try {
-              Thread.sleep(10);
-            } catch (InterruptedException e) {
-              if (!stopped.get()) {
-                LOG.warn("Event Router thread interrupted. Returning.");
-              }
-              return;
-            }
-          }
-          try {
-            TezEvent e = eventsToBeProcessed.poll(10, TimeUnit.MILLISECONDS);
-            if (e == null) {
-              continue;
-            }
-            // TODO TODONEWTEZ
-            try {
-              taskLock.readLock().lock();
-              if (currentTask != null) {
-                try {
-                  currentTask.handleEvent(e);
-                } catch (Throwable t) {
-                  LOG.warn("Failed to handle event", t);
-                  currentTask.setFatalError(t, "Failed to handle event");
-                  TezEvent taskAttemptFailedEvent = new TezEvent(
-                      new TaskAttemptFailedEvent(
-                          StringUtils.stringifyException(t)),
-                      new EventMetaData(EventProducerConsumerType.SYSTEM,
-                          "", "", currentTaskAttemptID));
-                  try {
-                    umbilical.taskAttemptFailed(currentTaskAttemptID,
-                        taskAttemptFailedEvent);
-                  } catch (IOException ioe) {
-                    // TODO Auto-generated catch block
-                    ioe.printStackTrace();
-                    // TODO NEWTEZ System exit?
-                  }
-                }
-              }
-            } finally {
-              taskLock.readLock().unlock();
-            }
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Event Router thread interrupted. Returning.");
-            }
-            return;
-          }
-        }
-      }
-    });
-    eventRouterThread.setName("Tez Container Event Router Thread ["
-        + containerIdStr + "]");
-    eventRouterThread.start();
-    return eventRouterThread;
-  }
-
   private static void heartbeat() throws TezException, IOException {
     TezEvent updateEvent = null;
+    int eventCounter = 0;
+    int eventsRange = 0;
+    TezTaskAttemptID taskAttemptID = null;
     try {
       taskLock.readLock().lock();
-      if (currentTask == null) {
-        return;
-      } else {
+      if (currentTask != null) {
+        taskAttemptID = currentTaskAttemptID;
+        eventCounter = currentTask.getEventCounter();
+        eventsRange = maxEventsToGet;
         updateEvent = new TezEvent(new TaskStatusUpdateEvent(
             currentTask.getCounters(), currentTask.getProgress()),
-            new EventMetaData(EventProducerConsumerType.SYSTEM,
-                "", "", currentTaskAttemptID));
+              new EventMetaData(EventProducerConsumerType.SYSTEM,
+                  "", "", taskAttemptID));
       }
     } finally {
       taskLock.readLock().unlock();
@@ -240,14 +171,20 @@ public class YarnTezDagChild {
     eventsToSend.drainTo(events);
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
-        currentTaskAttemptID, eventCounter, maxEventsToGet);
+        taskAttemptID, eventCounter, eventsRange);
     TezHeartbeatResponse response = umbilical.heartbeat(request);
     if (response.getLastRequestId() != reqId) {
       // TODO TODONEWTEZ
       throw new TezException("AM and Task out of sync");
     }
-    eventCounter += response.getEvents().size();
-    eventsToBeProcessed.addAll(response.getEvents());
+    try {
+      taskLock.readLock().lock();
+      if (currentTask != null) {
+        currentTask.handleEvents(response.getEvents());
+      }
+    } finally {
+      taskLock.readLock().unlock();
+    }
   }
 
   public static void main(String[] args) throws Throwable {
@@ -320,7 +257,6 @@ public class YarnTezDagChild {
     });
 
     Thread heartbeatThread = startHeartbeatThread();
-    Thread eventRouterThread = startRouterThread();
 
     TezUmbilical tezUmbilical = new TezUmbilical() {
       @Override
@@ -364,7 +300,7 @@ public class YarnTezDagChild {
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
     int taskCount = 0;
-    TezVertexID currentVertexId = null;
+    TezVertexID lastVertexId = null;
     EventMetaData currentSourceInfo = null;
     try {
       while (true) {
@@ -401,15 +337,15 @@ public class YarnTezDagChild {
           TezVertexID newVertexId =
               currentTaskAttemptID.getTaskID().getVertexID();
 
-          if (currentVertexId != null) {
-            if (!currentVertexId.equals(newVertexId)) {
+          if (lastVertexId != null) {
+            if (!lastVertexId.equals(newVertexId)) {
               objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
             }
-            if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
+            if (!lastVertexId.getDAGId().equals(newVertexId.getDAGId())) {
               objectRegistry.clearCache(ObjectLifeCycle.DAG);
             }
           }
-          currentVertexId = newVertexId;
+          lastVertexId = newVertexId;
           updateLoggers(currentTaskAttemptID);
           currentTask = createLogicalTask(
               taskSpec, defaultConf, tezUmbilical, jobToken);
@@ -458,7 +394,11 @@ public class YarnTezDagChild {
             }
             try {
               taskLock.writeLock().lock();
+              if (currentTask != null) {
+                currentTask.cleanup();
+              }
               currentTask = null;
+              currentTaskAttemptID = null;
             } finally {
               taskLock.writeLock().unlock();
             }
@@ -487,8 +427,7 @@ public class YarnTezDagChild {
       }
     } finally {
       stopped.set(true);
-      eventRouterThread.join();
-      heartbeatThread.join();
+      heartbeatThread.interrupt();
       RPC.stopProxy(umbilical);
       DefaultMetricsSystem.shutdown();
       // Shutting down log4j of the child-vm...

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f69ea2d..20392bf 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -21,17 +21,21 @@ package org.apache.tez.engine.newruntime;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -66,10 +70,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private static final Log LOG = LogFactory
       .getLog(LogicalIOProcessorRuntimeTask.class);
 
-  private final TaskSpec taskSpec;
-  private final Configuration tezConf;
-  private final TezUmbilical tezUmbilical;
-
   private final List<InputSpec> inputSpecs;
   private final List<LogicalInput> inputs;
 
@@ -84,15 +84,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private Map<String, LogicalInput> inputMap;
   private Map<String, LogicalOutput> outputMap;
 
+  private AtomicBoolean stopped;
+  private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
+  private Thread eventRouterThread = null;
+
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
       Configuration tezConf, TezUmbilical tezUmbilical,
       Token<JobTokenIdentifier> jobToken) throws IOException {
     // TODO Remove jobToken from here post TEZ-421
+    super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
-    this.taskSpec = taskSpec;
-    this.tezConf = tezConf;
-    this.tezUmbilical = tezUmbilical;
     this.inputSpecs = taskSpec.getInputs();
     this.inputs = createInputs(inputSpecs);
     this.outputSpecs = taskSpec.getOutputs();
@@ -102,6 +104,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
         ShuffleUtils.convertJobTokenToBytes(jobToken));
+    this.stopped = new AtomicBoolean(false);
+    this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();    
     this.state = State.NEW;
   }
 
@@ -130,6 +134,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     // Initialize processor.
     initializeLogicalIOProcessor();
+    startRouterThread();
   }
 
   public void run() throws Exception {
@@ -143,29 +148,36 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public void close() throws Exception {
-    Preconditions.checkState(this.state == State.RUNNING,
-        "Can only run while in RUNNING state. Current: " + this.state);
-    this.state = State.CLOSED;
-
-    // Close the Inputs.
-    for (int i = 0; i < inputs.size(); i++) {
-      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-      List<Event> closeInputEvents = inputs.get(i).close();
-      sendTaskGeneratedEvents(closeInputEvents,
-          EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
-          srcVertexName, taskSpec.getTaskAttemptID());
-    }
+    try {
+      Preconditions.checkState(this.state == State.RUNNING,
+          "Can only run while in RUNNING state. Current: " + this.state);
+      this.state = State.CLOSED;
+
+      // Close the Inputs.
+      for (int i = 0; i < inputs.size(); i++) {
+        String srcVertexName = inputSpecs.get(i).getSourceVertexName();
+        List<Event> closeInputEvents = inputs.get(i).close();
+        sendTaskGeneratedEvents(closeInputEvents,
+            EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
+            srcVertexName, taskSpec.getTaskAttemptID());
+      }
 
-    // Close the Processor.
-    processor.close();
+      // Close the Processor.
+      processor.close();
 
-    // Close the Outputs.
-    for (int i = 0; i < outputs.size(); i++) {
-      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-      List<Event> closeOutputEvents = outputs.get(i).close();
-      sendTaskGeneratedEvents(closeOutputEvents,
-          EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
-          destVertexName, taskSpec.getTaskAttemptID());
+      // Close the Outputs.
+      for (int i = 0; i < outputs.size(); i++) {
+        String destVertexName = outputSpecs.get(i).getDestinationVertexName();
+        List<Event> closeOutputEvents = outputs.get(i).close();
+        sendTaskGeneratedEvents(closeOutputEvents,
+            EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
+            destVertexName, taskSpec.getTaskAttemptID());
+      }
+    } finally {
+      stopped.set(true);
+      if (eventRouterThread != null) {
+        eventRouterThread.interrupt();
+      }
     }
   }
 
@@ -241,7 +253,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             + " is not a sub-type of LogicalInput."
             + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
       }
-
     }
     return inputs;
   }
@@ -290,34 +301,89 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
   }
 
-  public void handleEvent(TezEvent e) {
-    switch (e.getDestinationInfo().getEventGenerator()) {
-    case INPUT:
-      LogicalInput input = inputMap.get(
-          e.getDestinationInfo().getEdgeVertexName());
-      if (input != null) {
-        input.handleEvents(Collections.singletonList(e.getEvent()));
-      } else {
-        throw new TezUncheckedException("Unhandled event for invalid target: "
-            + e);
+  private boolean handleEvent(TezEvent e) {
+    try {
+      switch (e.getDestinationInfo().getEventGenerator()) {
+      case INPUT:
+        LogicalInput input = inputMap.get(
+            e.getDestinationInfo().getEdgeVertexName());
+        if (input != null) {
+          input.handleEvents(Collections.singletonList(e.getEvent()));
+        } else {
+          throw new TezUncheckedException("Unhandled event for invalid target: "
+              + e);
+        }
+        break;
+      case OUTPUT:
+        LogicalOutput output = outputMap.get(
+            e.getDestinationInfo().getEdgeVertexName());
+        if (output != null) {
+          output.handleEvents(Collections.singletonList(e.getEvent()));
+        } else {
+          throw new TezUncheckedException("Unhandled event for invalid target: "
+              + e);
+        }
+        break;
+      case PROCESSOR:
+        processor.handleEvents(Collections.singletonList(e.getEvent()));
+        break;
+      case SYSTEM:
+        LOG.warn("Trying to send a System event in a Task: " + e);
+        break;
       }
-      break;
-    case OUTPUT:
-      LogicalOutput output = outputMap.get(
-          e.getDestinationInfo().getEdgeVertexName());
-      if (output != null) {
-        output.handleEvents(Collections.singletonList(e.getEvent()));
-      } else {
-        throw new TezUncheckedException("Unhandled event for invalid target: "
-            + e);
+    } catch (Throwable t) {
+      LOG.warn("Failed to handle event", t);
+      setFatalError(t, "Failed to handle event");
+      EventMetaData sourceInfo = new EventMetaData(
+          e.getDestinationInfo().getEventGenerator(),
+          taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
+          getTaskAttemptID());
+      tezUmbilical.signalFatalError(getTaskAttemptID(),
+          StringUtils.stringifyException(t), sourceInfo);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void handleEvents(Collection<TezEvent> events) {
+    eventsToBeProcessed.addAll(events);
+    eventCounter.addAndGet(events.size());
+  }
+
+  private void startRouterThread() {
+    eventRouterThread = new Thread(new Runnable() {
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            TezEvent e = eventsToBeProcessed.take();
+            if (e == null) {
+              continue;
+            }
+            // TODO TODONEWTEZ
+            if (!handleEvent(e)) {
+              LOG.warn("Stopping Event Router thread as failed to handle"
+                  + " event: " + e);
+              break;
+            }
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Event Router thread interrupted. Returning.");
+            }
+          }
+        }
       }
-      break;
-    case PROCESSOR:
-      processor.handleEvents(Collections.singletonList(e.getEvent()));
-      break;
-    case SYSTEM:
-      LOG.warn("Trying to send a System event in a Task: " + e);
-      break;
+    });
+
+    eventRouterThread.setName("TezTaskEventRouter["
+        + taskSpec.getTaskAttemptID().toString() + "]");
+    eventRouterThread.start();
+  }
+
+  public synchronized void cleanup() {
+    stopped.set(true);
+    if (eventRouterThread != null) {
+      eventRouterThread.interrupt();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/62f65223/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index 045d1c6..92840ae 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -18,9 +18,16 @@
 
 package org.apache.tez.engine.newruntime;
 
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
 
 public abstract class RuntimeTask {
 
@@ -28,7 +35,21 @@ public abstract class RuntimeTask {
   protected Throwable fatalError = null;
   protected String fatalErrorMessage = null;
   protected float progress;
-  protected final TezCounters tezCounters = new TezCounters();
+  protected final TezCounters tezCounters;
+  protected final TaskSpec taskSpec;
+  protected final Configuration tezConf;
+  protected final TezUmbilical tezUmbilical;
+  protected final AtomicInteger eventCounter;
+
+  protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+      TezUmbilical tezUmbilical) {
+    this.taskSpec = taskSpec;
+    this.tezConf = tezConf;
+    this.tezUmbilical = tezUmbilical;
+    this.tezCounters = new TezCounters();
+    this.eventCounter = new AtomicInteger(0);
+    this.progress = 0.0f;
+  }
 
   protected enum State {
     NEW, INITED, RUNNING, CLOSED;
@@ -58,4 +79,14 @@ public abstract class RuntimeTask {
     return this.tezCounters;
   }
 
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskSpec.getTaskAttemptID();
+  }
+
+  public abstract void handleEvents(Collection<TezEvent> events);
+
+  public int getEventCounter() {
+    return eventCounter.get();
+  }
+
 }