You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/21 06:41:27 UTC

git commit: TEZ-863 Addendum. Queue events for relevant inputs untill the Input has been started. Fixes a potential NPE in case of no auto start. (Contributed by Rajesh Balamohan)

Repository: incubator-tez
Updated Branches:
  refs/heads/master e4c2a8253 -> 238255b69


TEZ-863 Addendum. Queue events for relevant inputs untill the Input has
been started. Fixes a potential NPE in case of no auto start.
(Contributed by Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/238255b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/238255b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/238255b6

Branch: refs/heads/master
Commit: 238255b6941fa645bb5aab32ab78515cf6994c38
Parents: e4c2a82
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 20 21:40:17 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 20 21:40:17 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/runtime/library/input/ShuffledMergedInput.java  | 6 +++++-
 .../tez/runtime/library/input/ShuffledUnorderedKVInput.java    | 3 ++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/238255b6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index d70924a..d2d4e56 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -95,9 +95,12 @@ public class ShuffledMergedInput implements LogicalInput {
   @Override
   public void start() throws IOException {
     synchronized (this) {
-      if (!isStarted.getAndSet(true)) {
+      if (!isStarted.get()) {
         // Start the shuffle - copy and merge
         shuffle.run();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
+        }
         List<Event> pending = new LinkedList<Event>();
         pendingEvents.drainTo(pending);
         if (pending.size() > 0) {
@@ -105,6 +108,7 @@ public class ShuffledMergedInput implements LogicalInput {
               + (System.currentTimeMillis() - firstEventReceivedTime));
           shuffle.handleEvents(pending);
         }
+        isStarted.set(true);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/238255b6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index f00ef3d..1241606 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -74,7 +74,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   @Override
   public void start() throws IOException {
     synchronized (this) {
-      if (!isStarted.getAndSet(true)) {
+      if (!isStarted.get()) {
         this.shuffleManager.run();
         this.kvReader = this.shuffleManager.createReader();
         List<Event> pending = new LinkedList<Event>();
@@ -84,6 +84,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
               + (System.currentTimeMillis() - firstEventReceivedTime));
           shuffleManager.handleEvents(pending);
         }
+        isStarted.set(true);
       }
     }
   }