You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/11 02:06:30 UTC

git commit: TEZ-429. Basic event router for Logical Runtime task. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 0bd0f7941 -> d9fc91d56


TEZ-429. Basic event router for Logical Runtime task. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d9fc91d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d9fc91d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d9fc91d5

Branch: refs/heads/TEZ-398
Commit: d9fc91d56587193f046c06d0984de7f56c4f7a61
Parents: 0bd0f79
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 10 17:05:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 10 17:05:43 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 54 ++++++++++++++------
 .../LogicalIOProcessorRuntimeTask.java          | 30 ++++++++++-
 2 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9fc91d5/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3141910..462c30e 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -30,7 +30,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +74,9 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
+import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.newapi.impl.EventMetaData;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
@@ -80,6 +84,7 @@ import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -100,10 +105,10 @@ public class YarnTezDagChild {
   private static String containerIdStr;
   private static int eventCounter = 0;
   private static int maxEventsToGet = 0;
-  private static LinkedList<TezEvent> eventsToSend =
-      new LinkedList<TezEvent>();
-  private static ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed =
-      new ConcurrentLinkedQueue<TezEvent>();
+  private static LinkedBlockingQueue<TezEvent> eventsToSend =
+      new LinkedBlockingQueue<TezEvent>();
+  private static LinkedBlockingQueue<TezEvent> eventsToBeProcessed =
+      new LinkedBlockingQueue<TezEvent>();
   private static AtomicLong requestCounter = new AtomicLong(0);
   private static TezTaskAttemptID currentTaskAttemptID;
   private static long amPollInterval;
@@ -168,9 +173,8 @@ public class YarnTezDagChild {
             }
           }
           try {
-            TezEvent e = eventsToBeProcessed.poll();
+            TezEvent e = eventsToBeProcessed.poll(10, TimeUnit.MILLISECONDS);
             if (e == null) {
-              eventsToBeProcessed.wait();
               continue;
             }
             // TODO TODONEWTEZ
@@ -182,7 +186,12 @@ public class YarnTezDagChild {
             try {
               taskLock.readLock().lock();
               if (currentTask != null) {
-                currentTask.handleEvent(e);
+                try {
+                  currentTask.handleEvent(e);
+                } catch (Throwable t) {
+                  LOG.warn("Failed to handle event", t);
+                  // TODONEWTEZ
+                }
               }
             } finally {
               taskLock.readLock().unlock();
@@ -213,7 +222,7 @@ public class YarnTezDagChild {
     }
     synchronized (eventLock) {
       List<TezEvent> events = new ArrayList<TezEvent>();
-      events.addAll(eventsToSend);
+      eventsToSend.drainTo(events);
       long reqId = requestCounter.incrementAndGet();
       TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
           currentTaskAttemptID, eventCounter, maxEventsToGet);
@@ -222,10 +231,8 @@ public class YarnTezDagChild {
         // TODO TODONEWTEZ
         throw new TezException("AM and Task out of sync");
       }
-      eventsToSend.clear();
       eventCounter += response.getEvents().size();
       eventsToBeProcessed.addAll(response.getEvents());
-      eventsToBeProcessed.notifyAll();
     }
   }
 
@@ -314,7 +321,6 @@ public class YarnTezDagChild {
     if (LOG.isDebugEnabled()) {
       LOG.debug("PID, containerId: " + pid + ", " + containerIdentifier);
     }
-    TaskSpec taskSpec = null;
     ContainerTask containerTask = null;
     UserGroupInformation childUGI = null;
     ContainerContext containerContext = new ContainerContext(
@@ -347,7 +353,7 @@ public class YarnTezDagChild {
           return;
         }
         taskCount++;
-        taskSpec = containerTask.getTaskSpec();
+        final TaskSpec taskSpec = containerTask.getTaskSpec();
         if (LOG.isDebugEnabled()) {
           LOG.debug("New container task context:"
               + taskSpec.toString());
@@ -386,9 +392,25 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            currentTask.initialize();
-            currentTask.run();
-            currentTask.close();
+            EventMetaData sourceInfo = new EventMetaData(EventGenerator.SYSTEM,
+                taskSpec.getVertexName(), "", currentTaskAttemptID);
+            try {
+              currentTask.initialize();
+              currentTask.run();
+              currentTask.close();
+              // TODONEWTEZ check if task had a fatal error before
+              // sending completed event
+              TezEvent taskCompletedEvent =
+                  new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
+              umbilical.taskAttemptCompleted(currentTaskAttemptID,
+                  taskCompletedEvent);
+            } catch (Throwable t) {
+              TezEvent taskAttemptFailedEvent =
+                  new TezEvent(new TaskAttemptFailedEvent(t.getMessage()),
+                      sourceInfo);
+              umbilical.taskAttemptCompleted(currentTaskAttemptID,
+                  taskAttemptFailedEvent);
+            }
             try {
               taskLock.writeLock().lock();
               currentTask = null;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9fc91d5/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index f077831..f0ac36e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.newruntime;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -290,6 +291,33 @@ public class LogicalIOProcessorRuntimeTask {
   }
 
   public void handleEvent(TezEvent e) {
-    // TODO TODONEWTEZ
+    switch (e.getDestinationInfo().getEventGenerator()) {
+    case INPUT:
+      LogicalInput input = inputMap.get(
+          e.getDestinationInfo().getEdgeVertexName());
+      if (input != null) {
+        input.handleEvents(Collections.singletonList(e.getEvent()));
+      } else {
+        throw new TezUncheckedException("Unhandled event for invalid target: "
+            + e);
+      }
+      break;
+    case OUTPUT:
+      LogicalOutput output = outputMap.get(
+          e.getDestinationInfo().getEdgeVertexName());
+      if (output != null) {
+        output.handleEvents(Collections.singletonList(e.getEvent()));
+      } else {
+        throw new TezUncheckedException("Unhandled event for invalid target: "
+            + e);
+      }
+      break;
+    case PROCESSOR:
+      processor.handleEvents(Collections.singletonList(e.getEvent()));
+      break;
+    case SYSTEM:
+      LOG.warn("Trying to send a System event in a Task: " + e);
+      break;
+    }
   }
 }