You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/23 01:24:30 UTC

git commit: TEZ-844. Processors should have a mechanism to know when an Input is ready for consumption. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 1752d553f -> e17e750c9


TEZ-844. Processors should have a mechanism to know when an Input is
ready for consumption. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e17e750c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e17e750c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e17e750c

Branch: refs/heads/master
Commit: e17e750c9527b801969e36a54ba445ab6fef737d
Parents: 1752d55
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Feb 22 16:24:11 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Feb 22 16:24:11 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/runtime/api/Input.java  |  12 +-
 .../tez/runtime/api/InputReadyCallback.java     |  32 ++
 .../tez/runtime/api/MergedLogicalInput.java     |  30 +-
 .../java/org/apache/tez/runtime/api/Output.java |   4 +
 .../apache/tez/runtime/api/TezInputContext.java |   9 +
 .../tez/runtime/api/TezProcessorContext.java    |  32 ++
 .../org/apache/tez/mapreduce/input/MRInput.java |   4 +-
 .../processor/reduce/ReduceProcessor.java       |  25 +-
 .../apache/tez/runtime/InputReadyTracker.java   | 161 +++++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  23 +-
 .../runtime/api/impl/TezInputContextImpl.java   |  13 +-
 .../api/impl/TezProcessorContextImpl.java       |  26 +-
 .../tez/runtime/TestInputReadyTracker.java      | 325 +++++++++++++++++++
 .../TestLogicalIOProcessorRuntimeTask.java      |   1 +
 .../input/BroadcastShuffleManager.java          |   6 +-
 .../library/common/shuffle/impl/Shuffle.java    |   2 +
 .../input/ConcatenatedMergedKeyValueInput.java  |   7 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |   7 +-
 .../runtime/library/input/LocalMergedInput.java |   4 +-
 .../library/input/ShuffledMergedInput.java      |   7 +-
 .../library/input/ShuffledUnorderedKVInput.java |   4 +-
 .../java/org/apache/tez/test/TestInput.java     |   2 +-
 22 files changed, 693 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index 27f66d0..f397114 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -23,10 +23,18 @@ import java.util.List;
 /**
  * Represents an input through which a TezProcessor receives data on an edge.
  * </p>
- *
+ * 
  * <code>Input</code> classes must have a 0 argument public constructor for Tez
  * to construct the <code>Input</code>. Tez will take care of initializing and
  * closing the Input after a {@link Processor} completes. </p>
+ * 
+ * During initialization, Inputs must specify an initial memory requirement via
+ * {@link TezInputContext}.requestInitialMemory
+ * 
+ * Inputs must also inform the framework once they are ready to be consumed.
+ * This typically means that the Processor will not block when reading from the
+ * corresponding Input. This is done via {@link TezInputContext}.inputIsReady.
+ * Inputs choose the policy on when they are ready.
  */
 public interface Input {
 
@@ -59,7 +67,7 @@ public interface Input {
    * @throws Exception
    */
   public void start() throws Exception;
-  
+
   /**
    * Gets an instance of the {@link Reader} for this <code>Output</code>
    *

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
new file mode 100644
index 0000000..0514651
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputReadyCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Used temporarily until MergedInputs have access to a context. Remove after
+ * TEZ-866
+ */
+@Private
+public interface InputReadyCallback {
+
+  public void setInputReady(Input input);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 1e82aca..2513589 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.api;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -29,14 +30,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class MergedLogicalInput implements LogicalInput {
 
+  // TODO Remove with TEZ-866
+  private volatile InputReadyCallback inputReadyCallback;
+  private AtomicBoolean notifiedInputReady = new AtomicBoolean(false);
   private List<Input> inputs;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
-  
+
   public final void initialize(List<Input> inputs) {
-    this.inputs = inputs;
+    this.inputs = Collections.unmodifiableList(inputs);
   }
   
-  protected List<Input> getInputs() {
+  public final List<Input> getInputs() {
     return inputs;
   }
   
@@ -69,4 +73,24 @@ public abstract class MergedLogicalInput implements LogicalInput {
     throw new UnsupportedOperationException();
   }
 
+  // TODO Remove with TEZ-866
+  public void setInputReadyCallback(InputReadyCallback callback) {
+    this.inputReadyCallback =  callback;
+  }
+  
+  /**
+   * Used by the actual MergedInput to notify that it's ready for consumption.
+   * TBD eventually via the context.
+   */
+  protected final void informInputReady() {
+    // TODO Fix with TEZ-866
+    if (!notifiedInputReady.getAndSet(true)) {
+      inputReadyCallback.setInputReady(this);
+    }
+  }
+
+  /**
+   * Used by the framework to inform the MergedInput that one of it's constituent Inputs is ready.
+   */
+  public abstract void setConstituentInputIsReady(Input input);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index e986c34..76af6f6 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -27,6 +27,10 @@ import java.util.List;
  * <code>Output</code> implementations must have a 0 argument public constructor
  * for Tez to construct the <code>Output</code>. Tez will take care of
  * initializing and closing the Input after a {@link Processor} completes. </p>
+ *
+ * During initialization, Outputs must specify an initial memory requirement via
+ * {@link TezOutputContext}.requestInitialMemory
+ * 
  */
 public interface Output {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
index 0e37030..4c9b6c8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
@@ -35,4 +35,13 @@ public interface TezInputContext extends TezTaskContext {
    * @return index
    */
   public int getInputIndex();
+  
+  /**
+   * Inform the framework that the specific Input is ready for consumption. This
+   * method will typically be invoked as a result of an
+   * Input.inputReadyNotificationRequired invocation.
+   * 
+   * This method can be invoked multiple times.
+   */
+  public void inputIsReady();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
index 001461b..913e6df 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezProcessorContext.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.api;
 
 import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Context handle for the Processor to initialize itself.
@@ -38,4 +39,35 @@ public interface TezProcessorContext extends TezTaskContext {
    */
   public boolean canCommit() throws IOException;
 
+  /**
+   * Blocking call which returns when any of the specified Inputs is ready for
+   * consumption.
+   * 
+   * There can be multiple parallel invocations of this function - where each
+   * invocation blocks on the Inputs that it specifies.
+   * 
+   * If multiple Inputs are ready, any one of them may be returned by this
+   * method - including an Input which may have been returned in a previous
+   * call. If invoking this method multiple times, it's recommended to remove
+   * previously completed Inputs from the invocation list.
+   * 
+   * @param inputs
+   *          the list of Inputs to monitor
+   * @return the Input which is ready for consumption
+   * @throws InterruptedException
+   */
+  public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException;
+  
+  /**
+   * Blocking call which returns only after all of the specified Inputs are
+   * ready for consumption.
+   * 
+   * There can be multiple parallel invocations of this function - where each
+   * invocation blocks on the Inputs that it specifies.
+   * 
+   * @param inputs
+   *          the list of Inputs to monitor
+   * @throws InterruptedException
+   */
+  public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index edd8e92..706f4fe 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -121,6 +121,7 @@ public class MRInput implements LogicalInput {
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
     this.inputContext.requestInitialMemory(0l, null); //mandatory call
+    this.inputContext.inputIsReady();
     MRInputUserPayloadProto mrUserPayload =
       MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
@@ -576,5 +577,6 @@ public class MRInput implements LogicalInput {
         return value;
       }
     }
-  };
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index f98e34f..41dff33 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.processor.reduce;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -41,12 +42,13 @@ import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
@@ -54,9 +56,7 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
-public class ReduceProcessor
-extends MRTask
-implements LogicalIOProcessor {
+public class ReduceProcessor extends MRTask implements LogicalIOProcessor {
 
   private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
 
@@ -94,13 +94,6 @@ implements LogicalIOProcessor {
       Map<String, LogicalOutput> outputs) throws Exception {
 
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
-    
-    for (LogicalInput input : inputs.values()) {
-      input.start();
-    }
-    for (LogicalOutput output : outputs.values()) {
-      output.start();
-    }
 
     if (outputs.size() <= 0 || outputs.size() > 1) {
       throw new IOException("Invalid number of outputs"
@@ -113,7 +106,15 @@ implements LogicalIOProcessor {
     }
 
     LogicalInput in = inputs.values().iterator().next();
+    in.start();
+
+    List<Input> pendingInputs = new LinkedList<Input>();
+    pendingInputs.add(in);
+    processorContext.waitForAllInputsReady(pendingInputs);
+    LOG.info("Input is ready for consumption. Starting Output");
+
     LogicalOutput out = outputs.values().iterator().next();
+    out.start();
 
     initTask(out);
 
@@ -364,4 +365,4 @@ implements LogicalIOProcessor {
     return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
new file mode 100644
index 0000000..e242ec0
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/InputReadyTracker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.InputReadyCallback;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class InputReadyTracker implements InputReadyCallback {
+
+  private final ConcurrentMap<Input, Boolean> readyInputs;
+  
+  private ConcurrentMap<Input, List<MergedLogicalInput>> inputToGroupMap;
+  
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition condition = lock.newCondition();
+
+  public InputReadyTracker() {
+    readyInputs = Maps.newConcurrentMap();
+  }
+
+  // Called by the InputContext once it's ready.
+  public void setInputIsReady(Input input) {
+    lock.lock();
+    try {
+      Boolean old = readyInputs.putIfAbsent(input, true);
+      if (old == null) {
+        informGroupedInputs(input);
+        condition.signalAll();
+      } else {
+        // Ignore duplicate inputReady from the same Input
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+
+  private void informGroupedInputs(Input input) {
+    if (inputToGroupMap != null) {
+      List<MergedLogicalInput> mergedInputList = inputToGroupMap.get(input);
+      if (mergedInputList != null) {
+        for (MergedLogicalInput mergedInput : mergedInputList) {
+          mergedInput.setConstituentInputIsReady(input);
+        }
+      }
+    }
+  }
+
+  public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
+    Preconditions.checkArgument(inputs != null && inputs.size() > 0,
+        "At least one input should be specified");
+    InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, true);
+    return inputReadyMonitor.awaitCondition();
+  }
+
+  public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
+    Preconditions.checkArgument(inputs != null && inputs.size() > 0,
+        "At least one input should be specified");
+    InputReadyMonitor inputReadyMonitor = new InputReadyMonitor(inputs, false);
+    inputReadyMonitor.awaitCondition();
+  }
+
+  private class InputReadyMonitor {
+
+    private final Set<Input> pendingInputs;
+    private final boolean selectOne;
+
+    public InputReadyMonitor(Collection<Input> inputs, boolean anyOne) {
+      pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<Input, Boolean>());
+      pendingInputs.addAll(inputs);
+      this.selectOne = anyOne;
+    }
+
+    public Input awaitCondition() throws InterruptedException {
+      lock.lock();
+      try {
+        while (pendingInputs.size() > 0) {
+          Iterator<Input> inputIter = pendingInputs.iterator();
+          while (inputIter.hasNext()) {
+            Input input = inputIter.next();
+            if (readyInputs.containsKey(input)) {
+              inputIter.remove();
+              // Return early in case of an ANY request
+              if (selectOne) {
+                return input;
+              }
+            }
+          }
+          if (pendingInputs.size() > 0) {
+            condition.await();
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+      return null;
+    }
+  }
+
+  @Override   // TODO Remove with TEZ-866
+  public void setInputReady(Input input) {
+    setInputIsReady(input);
+  }
+
+  public void setGroupedInputs(Collection<MergedLogicalInput> inputGroups) {
+    lock.lock();
+    try {
+      if (inputGroups != null) {
+        inputToGroupMap = Maps.newConcurrentMap();
+        for (MergedLogicalInput mergedInput : inputGroups) {
+          mergedInput.setInputReadyCallback(this);
+          for (Input dest : mergedInput.getInputs()) {
+            // Check already readt Inputs - may have become ready during initialize
+            if (readyInputs.containsKey(dest)) {
+              mergedInput.setConstituentInputIsReady(dest);
+            }
+            List<MergedLogicalInput> mergedList = inputToGroupMap.get(dest);
+            if (mergedList == null) {
+              mergedList = Lists.newArrayList();
+              inputToGroupMap.put(dest, mergedList);
+            }
+            mergedList.add(mergedInput);
+          }
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index f5b414d..ae30b70 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -93,7 +93,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
   
   private final List<GroupInputSpec> groupInputSpecs;
-  private ConcurrentHashMap<String, LogicalInput> groupInputsMap;
+  private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
   
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
@@ -116,6 +116,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private Thread eventRouterThread = null;
 
   private final int appAttemptNumber;
+  
+  private final InputReadyTracker inputReadyTracker;
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, TezUmbilical tezUmbilical,
@@ -154,6 +156,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.groupInputSpecs = taskSpec.getGroupInputs();
     initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
     this.startedInputsMap = startedInputsMap;
+    this.inputReadyTracker = new InputReadyTracker();
   }
 
   /**
@@ -201,6 +204,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     LOG.info("All initializers finished");
     // group inputs depend on inputs beings initialized. So must be done after.
     initializeGroupInputs();
+    // Register the groups so that appropriate calls can be made.
+    this.inputReadyTracker
+        .setGroupedInputs(groupInputsMap == null ? null : groupInputsMap.values());
     // Grouped input start will be controlled by the start of the GroupedInput
     
     // Construct the set of groupedInputs up front so that start is not invoked on them.
@@ -266,6 +272,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         }
       }
     }
+    LOG.info("AutoStartComplete");
 
     
 
@@ -348,7 +355,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.info("Initializing Input using InputSpec: " + inputSpec);
       String edgeName = inputSpec.getSourceVertexName();
       LogicalInput input = createInput(inputSpec);
-      TezInputContext inputContext = createInputContext(inputSpec, inputIndex);
+      TezInputContext inputContext = createInputContext(input, inputSpec, inputIndex);
       inputsMap.put(edgeName, input);
       inputContextMap.put(edgeName, inputContext);
 
@@ -407,7 +414,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
             .getPhysicalEdgeCount());
       }
-      LOG.info("Initializing Input with dest edge: " + edgeName);
+      LOG.info("Initializing Output with dest edge: " + edgeName);
       List<Event> events = output.initialize(outputContext);
       sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
           outputContext.getTaskVertexName(),
@@ -426,8 +433,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private void initializeGroupInputs() {
-    if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
-     groupInputsMap = new ConcurrentHashMap<String, LogicalInput>(groupInputSpecs.size());
+    if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {      
+     groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
         LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
         MergedLogicalInput groupInput = (MergedLogicalInput) createInputFromDescriptor(
@@ -452,7 +459,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         + processorDescriptor.getClassName());
   }
 
-  private TezInputContext createInputContext(InputSpec inputSpec, int inputIndex) {
+  private TezInputContext createInputContext(Input input, InputSpec inputSpec, int inputIndex) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
         inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
@@ -461,7 +468,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             .getProcessorDescriptor().getUserPayload() : inputSpec
             .getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        inputSpec.getInputDescriptor());
+        inputSpec.getInputDescriptor(), input, inputReadyTracker);
     return inputContext;
   }
 
@@ -483,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
         tezCounters, processorDescriptor.getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        processorDescriptor);
+        processorDescriptor, inputReadyTracker);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index eeb7df0..d4b07d1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
@@ -41,6 +43,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
+  private final Input input;
+  private final InputReadyTracker inputReadyTracker;
 
   @Private
   public TezInputContextImpl(Configuration conf, int appAttemptNumber,
@@ -49,7 +53,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
       TezCounters counters, int inputIndex, byte[] userPayload,
       RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      InputDescriptor inputDescriptor) {
+      InputDescriptor inputDescriptor,  Input input, InputReadyTracker inputReadyTracker) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, inputDescriptor);
@@ -59,6 +63,8 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.sourceInfo = new EventMetaData(
         EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
+    this.input = input;
+    this.inputReadyTracker = inputReadyTracker;
   }
 
   @Override
@@ -90,4 +96,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public void fatalError(Throwable exception, String message) {
     super.signalFatalError(exception, message, sourceInfo);
   }
+
+  @Override
+  public void inputIsReady() {
+    inputReadyTracker.setInputIsReady(input);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index c25802a..106251c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -18,27 +18,34 @@
 
 package org.apache.tez.runtime.api.impl;
 
-import java.nio.ByteBuffer;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
-public class TezProcessorContextImpl extends TezTaskContextImpl
-  implements TezProcessorContext {
+public class TezProcessorContextImpl extends TezTaskContextImpl implements TezProcessorContext {
 
+  private static final Log LOG = LogFactory.getLog(TezProcessorContextImpl.class);
+  
   private final byte[] userPayload;
   private final EventMetaData sourceInfo;
+  private final InputReadyTracker inputReadyTracker;
 
   public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
       TezUmbilical tezUmbilical, String vertexName,
@@ -46,13 +53,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
       byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      ProcessorDescriptor processorDescriptor) {
+      ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker) {
     super(conf, appAttemptNumber, vertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor);
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
+    this.inputReadyTracker = inputReadyTracker;
   }
 
   @Override
@@ -84,4 +92,14 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
   public boolean canCommit() throws IOException {
     return tezUmbilical.canCommit(this.taskAttemptID);
   }
+
+  @Override
+  public Input waitForAnyInputReady(Collection<Input> inputs) throws InterruptedException {
+    return inputReadyTracker.waitForAnyInputReady(inputs);
+  }
+
+  @Override
+  public void waitForAllInputsReady(Collection<Input> inputs) throws InterruptedException {
+    inputReadyTracker.waitForAllInputsReady(inputs);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
new file mode 100644
index 0000000..6f5aa2f
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+public class TestInputReadyTracker {
+
+  private static final long SLEEP_TIME = 500l;
+  
+  @Test(timeout = 5000)
+  public void testWithoutGrouping1() throws InterruptedException {
+    InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+    ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+    ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+
+    // Test for simple inputs
+    List<Input> requestList;
+    long startTime = 0l;
+    long readyTime = 0l;
+    requestList = new ArrayList<Input>();
+    requestList.add(input1);
+    requestList.add(input2);
+    Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    assertTrue(input1.isReady);
+    assertFalse(input2.isReady);
+    assertEquals(input1, readyInput);
+    
+    startTime = System.currentTimeMillis();
+    setDelayedInputReady(input2);
+    inputReadyTracker.waitForAllInputsReady(requestList);
+    readyTime = System.currentTimeMillis();
+    // Should have moved into ready state - only happens when the setReady function is invoked.
+    // Ensure the method returned only after the specific Input was told it is ready
+    assertTrue(input2.isReady);
+    assertTrue(readyTime >= startTime + SLEEP_TIME);
+    assertTrue(input1.isReady);
+  }
+
+  @Test(timeout = 5000)
+  public void testWithoutGrouping2() throws InterruptedException {
+    InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+    ControlledReadyInputForTest input1 = new ControlledReadyInputForTest(inputReadyTracker);
+    ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+    ControlledReadyInputForTest input3 = new ControlledReadyInputForTest(inputReadyTracker);
+
+    // Test for simple inputs
+    List<Input> requestList;
+    long startTime = 0l;
+    long readyTime = 0l;
+    
+    requestList = new ArrayList<Input>();
+    requestList.add(input1);
+    requestList.add(input2);
+    requestList.add(input3);
+    
+    startTime = System.currentTimeMillis();
+    setDelayedInputReady(input2);
+    Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    assertEquals(input2, readyInput);
+    readyTime = System.currentTimeMillis();
+    // Should have moved into ready state - only happens when the setReady function is invoked.
+    // Ensure the method returned only after the specific Input was told it is ready
+    assertTrue(input2.isReady);
+    assertTrue(readyTime >= startTime + SLEEP_TIME);
+    assertFalse(input1.isReady);
+    assertFalse(input3.isReady);
+    
+    requestList = new ArrayList<Input>();
+    requestList.add(input1);
+    requestList.add(input3);
+    startTime = System.currentTimeMillis();
+    setDelayedInputReady(input1);
+    readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    assertEquals(input1, readyInput);
+    readyTime = System.currentTimeMillis();
+    // Should have moved into ready state - only happens when the setReady function is invoked.
+    // Ensure the method returned only after the specific Input was told it is ready
+    assertTrue(input1.isReady);
+    assertTrue(readyTime >= startTime + SLEEP_TIME);
+    assertTrue(input2.isReady);
+    assertFalse(input3.isReady);
+    
+    requestList = new ArrayList<Input>();
+    requestList.add(input3);
+    startTime = System.currentTimeMillis();
+    setDelayedInputReady(input3);
+    readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    assertEquals(input3, readyInput);
+    readyTime = System.currentTimeMillis();
+    // Should have moved into ready state - only happens when the setReady function is invoked.
+    // Ensure the method returned only after the specific Input was told it is ready
+    assertTrue(input3.isReady);
+    assertTrue(readyTime >= startTime + SLEEP_TIME);
+    assertTrue(input1.isReady);
+    assertTrue(input2.isReady);
+  }
+
+  @Test(timeout = 5000)
+  public void testGrouped() throws InterruptedException {
+    InputReadyTracker inputReadyTracker = new InputReadyTracker();
+
+    ImmediatelyReadyInputForTest input1 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+    ControlledReadyInputForTest input2 = new ControlledReadyInputForTest(inputReadyTracker);
+    
+    ImmediatelyReadyInputForTest input3 = new ImmediatelyReadyInputForTest(inputReadyTracker);
+    ControlledReadyInputForTest input4 = new ControlledReadyInputForTest(inputReadyTracker);
+    
+    AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest();
+    AllMergedInputForTest group2 = new AllMergedInputForTest();
+    
+    List<Input> group1Inputs = new ArrayList<Input>();
+    group1Inputs.add(input1);
+    group1Inputs.add(input2);
+    
+    List<Input> group2Inputs = new ArrayList<Input>();
+    group2Inputs.add(input3);
+    group2Inputs.add(input4);
+
+    group1.initialize(group1Inputs);
+    group2.initialize(group2Inputs);
+    
+    // Register groups with tracker
+    List<MergedLogicalInput> groups = Lists.newArrayList(group1, group2);
+    inputReadyTracker.setGroupedInputs(groups);
+
+    // Test for simple inputs
+    List<Input> requestList;
+    long startTime = 0l;
+    long readyTime = 0l;
+    requestList = new ArrayList<Input>();
+    requestList.add(group1);
+    Input readyInput = inputReadyTracker.waitForAnyInputReady(requestList);
+    assertTrue(group1.isReady);
+    assertTrue(input1.isReady);
+    assertFalse(input2.isReady);
+    assertEquals(group1, readyInput);
+    
+    
+    requestList = new ArrayList<Input>();
+    requestList.add(group2);
+    
+    
+    startTime = System.currentTimeMillis();
+    setDelayedInputReady(input4);
+    inputReadyTracker.waitForAllInputsReady(requestList);
+    readyTime = System.currentTimeMillis();
+    // Should have moved into ready state - only happens when the setReady function is invoked.
+    // Ensure the method returned only after the specific Input was told it is ready
+    assertTrue(group2.isReady);
+    assertTrue(input3.isReady);
+    assertTrue(input4.isReady);
+    assertTrue(readyTime >= startTime + SLEEP_TIME);
+    
+  }
+  
+  private long setDelayedInputReady(final ControlledReadyInputForTest input) {
+    long startTime = System.currentTimeMillis();
+    new Thread() {
+      public void run() {
+        try {
+          Thread.sleep(SLEEP_TIME);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        input.setInputIsReady();
+      }
+    }.start();
+    return startTime;
+  }
+
+  private static class ImmediatelyReadyInputForTest implements LogicalInput {
+
+    private volatile boolean isReady = false;
+    
+    ImmediatelyReadyInputForTest(InputReadyTracker inputReadyTracker) {
+      isReady = true;
+      inputReadyTracker.setInputIsReady(this);
+    }
+
+    @Override
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
+  }
+  
+  private static class ControlledReadyInputForTest implements LogicalInput {
+
+    private volatile boolean isReady = false;
+    private InputReadyTracker inputReadyTracker;
+    
+    ControlledReadyInputForTest(InputReadyTracker inputReadyTracker) {
+      this.inputReadyTracker = inputReadyTracker;
+    }
+
+    @Override
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {      
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+    }
+    
+    // Used by the test to control when this input will be ready
+    public void setInputIsReady() {
+      isReady = true;
+      inputReadyTracker.setInputIsReady(this);
+    }
+  }
+
+  private static class AnyOneMergedInputForTest extends MergedLogicalInput {
+
+    private volatile boolean isReady = false;
+    
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setConstituentInputIsReady(Input input) {
+      isReady = true;
+      informInputReady();
+    }
+  }
+
+  private static class AllMergedInputForTest extends MergedLogicalInput {
+
+    private volatile boolean isReady = false;
+    private Set<Input> readyInputs = Sets.newHashSet();
+    
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void setConstituentInputIsReady(Input input) {
+      synchronized (this) {
+        readyInputs.add(input);
+      }
+      if (readyInputs.size() == getInputs().size()) {
+        isReady = true;
+        informInputReady();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index a1d2fa2..c2dd68b 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -175,6 +175,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     @Override
     public List<Event> initialize(TezInputContext inputContext) throws Exception {
       inputContext.requestInitialMemory(0, null);
+      inputContext.inputIsReady();
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index bc17496..54f80e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -93,6 +93,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
   private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
   
   private BlockingQueue<FetchedInput> completedInputs;
+  private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
   private Set<InputIdentifier> completedInputSet;
   private ConcurrentMap<String, InputHost> knownSrcHosts;
   private BlockingQueue<InputHost> pendingHosts;
@@ -127,7 +128,7 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   
   private volatile long initialMemoryAvailable = -1l;
-  
+
   // TODO NEWTEZ Add counters.
   
   public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
@@ -541,6 +542,9 @@ public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCal
     try {
       completedInputSet.add(fetchedInput.getInputAttemptIdentifier().getInputIdentifier());
       completedInputs.add(fetchedInput);
+      if (!inputReadyNotificationSent.getAndSet(true)) {
+        inputContext.inputIsReady();
+      }
       numCompletedInputs.incrementAndGet();
     } finally {
       lock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 1959533..8653b44 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -271,6 +271,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
                                  throwable);
         }
       }
+
+      inputContext.inputIsReady();
       return kvIter;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 004701d..f142ee9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
 import java.io.IOException;
 
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -69,4 +70,8 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
     return new ConcatenatedMergedKeyValueReader();
   }
 
-}
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 29cd490..8affa14 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.input;
 import java.io.IOException;
 
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -69,4 +70,8 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
     return new ConcatenatedMergedKeyValuesReader();
   }
 
-}
+  @Override
+  public void setConstituentInputIsReady(Input input) {
+    informInputReady();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index bbeae52..d298e8f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -37,6 +37,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
     this.inputContext.requestInitialMemory(0l, null); // mandatory call.
+    this.inputContext.inputIsReady();
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
     if (numInputs == 0) {
@@ -60,5 +61,4 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
     }
     return Collections.emptyList();
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index d2d4e56..de707a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -84,9 +84,9 @@ public class ShuffledMergedInput implements LogicalInput {
     }
 
     this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-    this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
-        inputContext.getWorkDirs());
+    this.inputValueCounter = inputContext.getCounters().findCounter(
+        TaskCounter.REDUCE_INPUT_RECORDS);
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
 
     shuffle = new Shuffle(inputContext, this.conf, numInputs);
     return Collections.emptyList();
@@ -239,5 +239,4 @@ public class ShuffledMergedInput implements LogicalInput {
         ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 1241606..d216eeb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -66,8 +66,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
       return null;
     }
 
-    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
-        numInputs);
+    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
     return Collections.emptyList();
   }
 
@@ -143,4 +142,5 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   public void setNumPhysicalInputs(int numInputs) {
     this.numInputs = numInputs;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e17e750c/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index da4ea16..90dadb6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -223,6 +223,7 @@ public class TestInput implements LogicalInput {
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     this.inputContext = inputContext;
     this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
+    this.inputContext.inputIsReady();
     if (inputContext.getUserPayload() != null) {
       String vName = inputContext.getTaskVertexName();
       conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
@@ -324,5 +325,4 @@ public class TestInput implements LogicalInput {
       this.inputValues[i] = -1;
     }
   }
-
 }