You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/28 23:55:58 UTC
git commit: TEZ-991. Fix a bug which could cause getReader to
occasionally hang on ShuffledMergedInput. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 6416102f4 -> f34c7f320
TEZ-991. Fix a bug which could cause getReader to occasionally hang on ShuffledMergedInput. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f34c7f32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f34c7f32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f34c7f32
Branch: refs/heads/master
Commit: f34c7f320ae471b5a3957b85e58e3b38eab44717
Parents: 6416102
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 28 15:55:18 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 28 15:55:18 2014 -0700
----------------------------------------------------------------------
.../library/input/ShuffledMergedInput.java | 128 ++++++++++++-------
.../input/ShuffledMergedInputLegacy.java | 6 +-
.../library/input/ShuffledUnorderedKVInput.java | 26 ++--
.../library/output/OnFileUnorderedKVOutput.java | 13 +-
4 files changed, 105 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 4d62346..8a6de52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -145,13 +145,22 @@ public class ShuffledMergedInput implements LogicalInput {
* @throws IOException
* @throws InterruptedException
*/
- public synchronized void waitForInputReady() throws IOException, InterruptedException {
- Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
- if (this.numInputs == 0) {
- return;
+ public void waitForInputReady() throws IOException, InterruptedException {
+ // Cannot synchronize entire method since this is called form user code and can block.
+ Shuffle localShuffleCopy = null;
+ synchronized (this) {
+ Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
+ if (this.numInputs == 0) {
+ return;
+ }
+ localShuffleCopy = shuffle;
+ }
+
+ TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+ synchronized(this) {
+ rawIter = localRawIter;
+ createValuesIterator();
}
- rawIter = shuffle.waitForInput();
- createValuesIterator();
}
@Override
@@ -175,26 +184,31 @@ public class ShuffledMergedInput implements LogicalInput {
* @return a KVReader over the sorted input.
*/
@Override
- public synchronized KeyValuesReader getReader() throws IOException {
- if (this.numInputs == 0) {
- return new KeyValuesReader() {
- @Override
- public boolean next() throws IOException {
- return false;
- }
+ public KeyValuesReader getReader() throws IOException {
+ // Cannot synchronize entire method since this is called form user code and can block.
+ TezRawKeyValueIterator rawIterLocal;
+ synchronized (this) {
+ rawIterLocal = rawIter;
+ if (this.numInputs == 0) {
+ return new KeyValuesReader() {
+ @Override
+ public boolean next() throws IOException {
+ return false;
+ }
- @Override
- public Object getCurrentKey() throws IOException {
- throw new RuntimeException("No data available in Input");
- }
+ @Override
+ public Object getCurrentKey() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
- @Override
- public Iterable<Object> getCurrentValues() throws IOException {
- throw new RuntimeException("No data available in Input");
- }
- };
+ @Override
+ public Iterable<Object> getCurrentValues() throws IOException {
+ throw new RuntimeException("No data available in Input");
+ }
+ };
+ }
}
- if (rawIter == null) {
+ if (rawIterLocal == null) {
try {
waitForInputReady();
} catch (InterruptedException e) {
@@ -202,35 +216,27 @@ public class ShuffledMergedInput implements LogicalInput {
throw new IOException("Interrupted while waiting for input ready", e);
}
}
- return new KeyValuesReader() {
-
- @Override
- public boolean next() throws IOException {
- return vIter.moveToNext();
- }
-
- public Object getCurrentKey() throws IOException {
- return vIter.getKey();
- }
-
- @SuppressWarnings("unchecked")
- public Iterable<Object> getCurrentValues() throws IOException {
- return vIter.getValues();
- }
- };
+ @SuppressWarnings("rawtypes")
+ ValuesIterator valuesIter = null;
+ synchronized(this) {
+ valuesIter = vIter;
+ }
+ return new ShuffledMergedKeyValuesReader(valuesIter);
}
@Override
- public synchronized void handleEvents(List<Event> inputEvents) {
- if (numInputs == 0) {
- throw new RuntimeException("No input events expected as numInputs is 0");
- }
- if (!isStarted.get()) {
- if (firstEventReceivedTime == -1) {
- firstEventReceivedTime = System.currentTimeMillis();
+ public void handleEvents(List<Event> inputEvents) {
+ synchronized (this) {
+ if (numInputs == 0) {
+ throw new RuntimeException("No input events expected as numInputs is 0");
+ }
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ pendingEvents.addAll(inputEvents);
+ return;
}
- pendingEvents.addAll(inputEvents);
- return;
}
shuffle.handleEvents(inputEvents);
}
@@ -241,7 +247,7 @@ public class ShuffledMergedInput implements LogicalInput {
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- protected void createValuesIterator()
+ protected synchronized void createValuesIterator()
throws IOException {
// Not used by ReduceProcessor
vIter = new ValuesIterator(rawIter,
@@ -250,4 +256,28 @@ public class ShuffledMergedInput implements LogicalInput {
ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
}
+
+ @SuppressWarnings("rawtypes")
+ private static class ShuffledMergedKeyValuesReader implements KeyValuesReader {
+
+ private final ValuesIterator valuesIter;
+
+ ShuffledMergedKeyValuesReader(ValuesIterator valuesIter) {
+ this.valuesIter = valuesIter;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return valuesIter.moveToNext();
+ }
+
+ public Object getCurrentKey() throws IOException {
+ return valuesIter.getKey();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Iterable<Object> getCurrentValues() throws IOException {
+ return valuesIter.getValues();
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 3bcc957..9d89eec 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -34,6 +34,7 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
@Private
public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
// wait for input so that iterator is available
+ synchronized(this) {
if (this.numInputs == 0) {
return new TezRawKeyValueIterator() {
@Override
@@ -62,8 +63,11 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
}
};
}
+ }
waitForInputReady();
- return rawIter;
+ synchronized(this) {
+ return rawIter;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index a07159f..61c870f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -174,19 +174,21 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
}
@Override
- public synchronized void handleEvents(List<Event> inputEvents) throws IOException {
- if (numInputs == 0) {
- throw new RuntimeException("No input events expected as numInputs is 0");
- }
- if (!isStarted.get()) {
- if (firstEventReceivedTime == -1) {
- firstEventReceivedTime = System.currentTimeMillis();
+ public void handleEvents(List<Event> inputEvents) throws IOException {
+ synchronized (this) {
+ if (numInputs == 0) {
+ throw new RuntimeException("No input events expected as numInputs is 0");
+ }
+ if (!isStarted.get()) {
+ if (firstEventReceivedTime == -1) {
+ firstEventReceivedTime = System.currentTimeMillis();
+ }
+ // This queue will keep growing if the Processor decides never to
+ // start the event. The Input, however has no idea, on whether start
+ // will be invoked or not.
+ pendingEvents.addAll(inputEvents);
+ return;
}
- // This queue will keep growing if the Processor decides never to
- // start the event. The Input, however has no idea, on whether start
- // will be invoked or not.
- pendingEvents.addAll(inputEvents);
- return;
}
inputEventHandler.handleEvents(inputEvents);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 91ea94a..68cfcae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -62,7 +62,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
- public List<Event> initialize(TezOutputContext outputContext)
+ public synchronized List<Event> initialize(TezOutputContext outputContext)
throws Exception {
this.outputContext = outputContext;
this.conf = TezUtils.createConfFromUserPayload(outputContext
@@ -88,21 +88,22 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
- public void start() {
+ public synchronized void start() {
}
@Override
- public KeyValueWriter getWriter() throws Exception {
+ public synchronized KeyValueWriter getWriter() throws Exception {
+ // Eventually, disallow multiple invocations.
return kvWriter;
}
@Override
- public void handleEvents(List<Event> outputEvents) {
+ public synchronized void handleEvents(List<Event> outputEvents) {
throw new TezUncheckedException("Not expecting any events");
}
@Override
- public List<Event> close() throws Exception {
+ public synchronized List<Event> close() throws Exception {
boolean outputGenerated = this.kvWriter.close();
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
@@ -150,7 +151,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
}
@Override
- public void setNumPhysicalOutputs(int numOutputs) {
+ public synchronized void setNumPhysicalOutputs(int numOutputs) {
Preconditions.checkArgument(numOutputs == 1,
"Number of outputs can only be 1 for " + this.getClass().getName());
}