You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/28 23:55:58 UTC

git commit: TEZ-991. Fix a bug which could cause getReader to occasionally hang on ShuffledMergedInput. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 6416102f4 -> f34c7f320


TEZ-991. Fix a bug which could cause getReader to occasionally hang on ShuffledMergedInput. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f34c7f32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f34c7f32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f34c7f32

Branch: refs/heads/master
Commit: f34c7f320ae471b5a3957b85e58e3b38eab44717
Parents: 6416102
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Mar 28 15:55:18 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Mar 28 15:55:18 2014 -0700

----------------------------------------------------------------------
 .../library/input/ShuffledMergedInput.java      | 128 ++++++++++++-------
 .../input/ShuffledMergedInputLegacy.java        |   6 +-
 .../library/input/ShuffledUnorderedKVInput.java |  26 ++--
 .../library/output/OnFileUnorderedKVOutput.java |  13 +-
 4 files changed, 105 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 4d62346..8a6de52 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -145,13 +145,22 @@ public class ShuffledMergedInput implements LogicalInput {
    * @throws IOException
    * @throws InterruptedException
    */
-  public synchronized void waitForInputReady() throws IOException, InterruptedException {
-    Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
-    if (this.numInputs == 0) {
-      return;
+  public void waitForInputReady() throws IOException, InterruptedException {
+    // Cannot synchronize entire method since this is called form user code and can block.
+    Shuffle localShuffleCopy = null;
+    synchronized (this) {
+      Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
+      if (this.numInputs == 0) {
+        return;
+      }
+      localShuffleCopy = shuffle;
+    }
+    
+    TezRawKeyValueIterator localRawIter = localShuffleCopy.waitForInput();
+    synchronized(this) {
+      rawIter = localRawIter;
+      createValuesIterator();
     }
-    rawIter = shuffle.waitForInput();
-    createValuesIterator();
   }
 
   @Override
@@ -175,26 +184,31 @@ public class ShuffledMergedInput implements LogicalInput {
    * @return a KVReader over the sorted input.
    */
   @Override
-  public synchronized KeyValuesReader getReader() throws IOException {
-    if (this.numInputs == 0) {
-      return new KeyValuesReader() {
-        @Override
-        public boolean next() throws IOException {
-          return false;
-        }
+  public KeyValuesReader getReader() throws IOException {
+    // Cannot synchronize entire method since this is called form user code and can block.
+    TezRawKeyValueIterator rawIterLocal;
+    synchronized (this) {
+      rawIterLocal = rawIter;
+      if (this.numInputs == 0) {
+        return new KeyValuesReader() {
+          @Override
+          public boolean next() throws IOException {
+            return false;
+          }
 
-        @Override
-        public Object getCurrentKey() throws IOException {
-          throw new RuntimeException("No data available in Input");
-        }
+          @Override
+          public Object getCurrentKey() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
 
-        @Override
-        public Iterable<Object> getCurrentValues() throws IOException {
-          throw new RuntimeException("No data available in Input");
-        }
-      };
+          @Override
+          public Iterable<Object> getCurrentValues() throws IOException {
+            throw new RuntimeException("No data available in Input");
+          }
+        };
+      }
     }
-    if (rawIter == null) {
+    if (rawIterLocal == null) {
       try {
         waitForInputReady();
       } catch (InterruptedException e) {
@@ -202,35 +216,27 @@ public class ShuffledMergedInput implements LogicalInput {
         throw new IOException("Interrupted while waiting for input ready", e);
       }
     }
-    return new KeyValuesReader() {
-
-      @Override
-      public boolean next() throws IOException {
-        return vIter.moveToNext();
-      }
-
-      public Object getCurrentKey() throws IOException {
-        return vIter.getKey();
-      }
-      
-      @SuppressWarnings("unchecked")
-      public Iterable<Object> getCurrentValues() throws IOException {
-        return vIter.getValues();
-      }
-    };
+    @SuppressWarnings("rawtypes")
+    ValuesIterator valuesIter = null;
+    synchronized(this) {
+      valuesIter = vIter;
+    }
+    return new ShuffledMergedKeyValuesReader(valuesIter);
   }
 
   @Override
-  public synchronized void handleEvents(List<Event> inputEvents) {
-    if (numInputs == 0) {
-      throw new RuntimeException("No input events expected as numInputs is 0");
-    }
-    if (!isStarted.get()) {
-      if (firstEventReceivedTime == -1) {
-        firstEventReceivedTime = System.currentTimeMillis();
+  public void handleEvents(List<Event> inputEvents) {
+    synchronized (this) {
+      if (numInputs == 0) {
+        throw new RuntimeException("No input events expected as numInputs is 0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        pendingEvents.addAll(inputEvents);
+        return;
       }
-      pendingEvents.addAll(inputEvents);
-      return;
     }
     shuffle.handleEvents(inputEvents);
   }
@@ -241,7 +247,7 @@ public class ShuffledMergedInput implements LogicalInput {
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected void createValuesIterator()
+  protected synchronized void createValuesIterator()
       throws IOException {
     // Not used by ReduceProcessor
     vIter = new ValuesIterator(rawIter,
@@ -250,4 +256,28 @@ public class ShuffledMergedInput implements LogicalInput {
         ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
 
   }
+
+  @SuppressWarnings("rawtypes")
+  private static class ShuffledMergedKeyValuesReader implements KeyValuesReader {
+
+    private final ValuesIterator valuesIter;
+
+    ShuffledMergedKeyValuesReader(ValuesIterator valuesIter) {
+      this.valuesIter = valuesIter;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      return valuesIter.moveToNext();
+    }
+
+    public Object getCurrentKey() throws IOException {
+      return valuesIter.getKey();
+    }
+
+    @SuppressWarnings("unchecked")
+    public Iterable<Object> getCurrentValues() throws IOException {
+      return valuesIter.getValues();
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
index 3bcc957..9d89eec 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -34,6 +34,7 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
   @Private
   public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
     // wait for input so that iterator is available
+    synchronized(this) {
     if (this.numInputs == 0) {
       return new TezRawKeyValueIterator() {
         @Override
@@ -62,8 +63,11 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
         }
       };
     }
+    }
 
     waitForInputReady();
-    return rawIter;
+    synchronized(this) {
+      return rawIter;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index a07159f..61c870f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -174,19 +174,21 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   }
 
   @Override
-  public synchronized void handleEvents(List<Event> inputEvents) throws IOException {
-    if (numInputs == 0) {
-      throw new RuntimeException("No input events expected as numInputs is 0");
-    }
-    if (!isStarted.get()) {
-      if (firstEventReceivedTime == -1) {
-        firstEventReceivedTime = System.currentTimeMillis();
+  public void handleEvents(List<Event> inputEvents) throws IOException {
+    synchronized (this) {
+      if (numInputs == 0) {
+        throw new RuntimeException("No input events expected as numInputs is 0");
+      }
+      if (!isStarted.get()) {
+        if (firstEventReceivedTime == -1) {
+          firstEventReceivedTime = System.currentTimeMillis();
+        }
+        // This queue will keep growing if the Processor decides never to
+        // start the event. The Input, however has no idea, on whether start
+        // will be invoked or not.
+        pendingEvents.addAll(inputEvents);
+        return;
       }
-      // This queue will keep growing if the Processor decides never to
-      // start the event. The Input, however has no idea, on whether start
-      // will be invoked or not.
-      pendingEvents.addAll(inputEvents);
-      return;
     }
     inputEventHandler.handleEvents(inputEvents);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f34c7f32/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 91ea94a..68cfcae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -62,7 +62,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public List<Event> initialize(TezOutputContext outputContext)
+  public synchronized List<Event> initialize(TezOutputContext outputContext)
       throws Exception {
     this.outputContext = outputContext;
     this.conf = TezUtils.createConfFromUserPayload(outputContext
@@ -88,21 +88,22 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public void start() {
+  public synchronized void start() {
   }
 
   @Override
-  public KeyValueWriter getWriter() throws Exception {
+  public synchronized KeyValueWriter getWriter() throws Exception {
+    // Eventually, disallow multiple invocations.
     return kvWriter;
   }
 
   @Override
-  public void handleEvents(List<Event> outputEvents) {
+  public synchronized void handleEvents(List<Event> outputEvents) {
     throw new TezUncheckedException("Not expecting any events");
   }
 
   @Override
-  public List<Event> close() throws Exception {
+  public synchronized List<Event> close() throws Exception {
     boolean outputGenerated = this.kvWriter.close();
     
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
@@ -150,7 +151,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
+  public synchronized void setNumPhysicalOutputs(int numOutputs) {
     Preconditions.checkArgument(numOutputs == 1,
         "Number of outputs can only be 1 for " + this.getClass().getName());
   }