You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/21 00:13:24 UTC
git commit: TEZ-863. Queue events for relevant inputs untill the
Input has been started. Fixes a potential NPE in case of no auto start.
(sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master be0d38988 -> 7e1d11010
TEZ-863. Queue events for relevant inputs untill the Input has been
started. Fixes a potential NPE in case of no auto start. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7e1d1101
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7e1d1101
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7e1d1101
Branch: refs/heads/master
Commit: 7e1d11010802050182835f3cc7a3b86619ee2cb1
Parents: be0d389
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 20 15:12:21 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 20 15:12:21 2014 -0800
----------------------------------------------------------------------
.../library/input/ShuffledMergedInput.java | 34 +++++++++++++++++---
.../library/input/ShuffledUnorderedKVInput.java | 31 ++++++++++++++++--
2 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 35c1ab5..d70924a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -19,7 +19,10 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -60,6 +63,9 @@ public class ShuffledMergedInput implements LogicalInput {
protected Configuration conf;
protected int numInputs = 0;
protected Shuffle shuffle;
+ private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
+ private volatile long firstEventReceivedTime = -1;
+ // ZZZ LOG THIS TIME
@SuppressWarnings("rawtypes")
protected ValuesIterator vIter;
@@ -88,9 +94,18 @@ public class ShuffledMergedInput implements LogicalInput {
@Override
public void start() throws IOException {
- if (!isStarted.getAndSet(true)) {
- // Start the shuffle - copy and merge
- shuffle.run();
+ synchronized (this) {
+ if (!isStarted.getAndSet(true)) {
+ // Start the shuffle - copy and merge
+ shuffle.run();
+ List<Event> pending = new LinkedList<Event>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ LOG.info("NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ shuffle.handleEvents(pending);
+ }
+ }
}
}
@@ -123,7 +138,7 @@ public class ShuffledMergedInput implements LogicalInput {
@Override
public List<Event> close() throws IOException {
- if (this.numInputs != 0) {
+ if (this.numInputs != 0 && rawIter != null) {
rawIter.close();
}
return Collections.emptyList();
@@ -192,6 +207,17 @@ public class ShuffledMergedInput implements LogicalInput {
if (numInputs == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
}
+ if (!isStarted.get()) {
+ synchronized (this) {
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ pendingEvents.addAll(inputEvents);
+ return;
+ }
+ }
+ }
shuffle.handleEvents(inputEvents);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 9123345..f00ef3d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -20,7 +20,10 @@ package org.apache.tez.runtime.library.input;
import java.io.IOException;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -43,6 +46,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
private Configuration conf;
private int numInputs = -1;
private BroadcastShuffleManager shuffleManager;
+ private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
+ private volatile long firstEventReceivedTime = -1;
@SuppressWarnings("rawtypes")
private BroadcastKVReader kvReader;
@@ -68,9 +73,18 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
@Override
public void start() throws IOException {
- if (!isStarted.getAndSet(true)) {
- this.shuffleManager.run();
- this.kvReader = this.shuffleManager.createReader();
+ synchronized (this) {
+ if (!isStarted.getAndSet(true)) {
+ this.shuffleManager.run();
+ this.kvReader = this.shuffleManager.createReader();
+ List<Event> pending = new LinkedList<Event>();
+ pendingEvents.drainTo(pending);
+ if (pending.size() > 0) {
+ LOG.info("NoAutoStart delay in processing first event: "
+ + (System.currentTimeMillis() - firstEventReceivedTime));
+ shuffleManager.handleEvents(pending);
+ }
+ }
}
}
@@ -102,6 +116,17 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
if (numInputs == 0) {
throw new RuntimeException("No input events expected as numInputs is 0");
}
+ if (!isStarted.get()) {
+ synchronized(this) {
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ pendingEvents.addAll(inputEvents);
+ return;
+ }
+ }
+ }
shuffleManager.handleEvents(inputEvents);
}