You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/21 00:13:24 UTC

git commit: TEZ-863. Queue events for relevant inputs untill the Input has been started. Fixes a potential NPE in case of no auto start. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master be0d38988 -> 7e1d11010


TEZ-863. Queue events for relevant inputs untill the Input has been
started. Fixes a potential NPE in case of no auto start. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7e1d1101
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7e1d1101
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7e1d1101

Branch: refs/heads/master
Commit: 7e1d11010802050182835f3cc7a3b86619ee2cb1
Parents: be0d389
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 20 15:12:21 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 20 15:12:21 2014 -0800

----------------------------------------------------------------------
 .../library/input/ShuffledMergedInput.java      | 34 +++++++++++++++++---
 .../library/input/ShuffledUnorderedKVInput.java | 31 ++++++++++++++++--
 2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 35c1ab5..d70924a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -19,7 +19,10 @@ package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -60,6 +63,9 @@ public class ShuffledMergedInput implements LogicalInput {
   protected Configuration conf;
   protected int numInputs = 0;
   protected Shuffle shuffle;
+  private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
+  private volatile long firstEventReceivedTime = -1;
+  // ZZZ LOG THIS TIME
   @SuppressWarnings("rawtypes")
   protected ValuesIterator vIter;
 
@@ -88,9 +94,18 @@ public class ShuffledMergedInput implements LogicalInput {
 
   @Override
   public void start() throws IOException {
-    if (!isStarted.getAndSet(true)) {
-      // Start the shuffle - copy and merge
-      shuffle.run();
+    synchronized (this) {
+      if (!isStarted.getAndSet(true)) {
+        // Start the shuffle - copy and merge
+        shuffle.run();
+        List<Event> pending = new LinkedList<Event>();
+        pendingEvents.drainTo(pending);
+        if (pending.size() > 0) {
+          LOG.info("NoAutoStart delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+          shuffle.handleEvents(pending);
+        }
+      }
     }
   }
 
@@ -123,7 +138,7 @@ public class ShuffledMergedInput implements LogicalInput {
 
   @Override
   public List<Event> close() throws IOException {
-    if (this.numInputs != 0) {
+    if (this.numInputs != 0 && rawIter != null) {
       rawIter.close();
     }
     return Collections.emptyList();
@@ -192,6 +207,17 @@ public class ShuffledMergedInput implements LogicalInput {
     if (numInputs == 0) {
       throw new RuntimeException("No input events expected as numInputs is 0");
     }
+    if (!isStarted.get()) {
+      synchronized (this) {
+        if (!isStarted.get()) {
+          if (firstEventReceivedTime == -1) {
+            firstEventReceivedTime = System.currentTimeMillis();
+          }
+          pendingEvents.addAll(inputEvents);
+          return;
+        }
+      }
+    }
     shuffle.handleEvents(inputEvents);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 9123345..f00ef3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -20,7 +20,10 @@ package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +46,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   private Configuration conf;
   private int numInputs = -1;
   private BroadcastShuffleManager shuffleManager;
+  private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
+  private volatile long firstEventReceivedTime = -1;
   @SuppressWarnings("rawtypes")
   private BroadcastKVReader kvReader;
   
@@ -68,9 +73,18 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
 
   @Override
   public void start() throws IOException {
-    if (!isStarted.getAndSet(true)) {
-      this.shuffleManager.run();
-      this.kvReader = this.shuffleManager.createReader();
+    synchronized (this) {
+      if (!isStarted.getAndSet(true)) {
+        this.shuffleManager.run();
+        this.kvReader = this.shuffleManager.createReader();
+        List<Event> pending = new LinkedList<Event>();
+        pendingEvents.drainTo(pending);
+        if (pending.size() > 0) {
+          LOG.info("NoAutoStart delay in processing first event: "
+              + (System.currentTimeMillis() - firstEventReceivedTime));
+          shuffleManager.handleEvents(pending);
+        }
+      }
     }
   }
 
@@ -102,6 +116,17 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     if (numInputs == 0) {
       throw new RuntimeException("No input events expected as numInputs is 0");
     }
+    if (!isStarted.get()) {
+      synchronized(this) {
+        if (!isStarted.get()) {
+          if (firstEventReceivedTime == -1) {
+            firstEventReceivedTime = System.currentTimeMillis();
+          }
+          pendingEvents.addAll(inputEvents);
+          return;
+        }
+      }
+    }
     shuffleManager.handleEvents(inputEvents);
   }