You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/13 05:09:18 UTC

git commit: TEZ-815. Split initialize and start implementations for the various Inputs and Outputs. (sseth)

Updated Branches:
  refs/heads/master 924ce6611 -> 3a63d9b1a


TEZ-815. Split initialize and start implementations for the various
Inputs and Outputs. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3a63d9b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3a63d9b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3a63d9b1

Branch: refs/heads/master
Commit: 3a63d9b1a0f83cf366c7273760baa600755bc8c1
Parents: 924ce66
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 12 20:08:33 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 12 20:08:33 2014 -0800

----------------------------------------------------------------------
 .../tez/runtime/api/MergedLogicalInput.java     |   9 +-
 .../apache/tez/runtime/api/TezTaskContext.java  |  13 +++
 .../org/apache/tez/mapreduce/input/MRInput.java |   7 +-
 .../apache/tez/mapreduce/output/MROutput.java   |   4 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |  26 +++--
 .../broadcast/input/BroadcastInputManager.java  |  65 ++++++++---
 .../input/BroadcastShuffleManager.java          |  86 +++++++++-----
 .../common/shuffle/impl/MergeManager.java       | 114 +++++++++++++------
 .../library/common/shuffle/impl/Shuffle.java    |  50 +++++---
 .../shuffle/impl/ShuffleInputEventHandler.java  |   2 -
 .../common/shuffle/impl/ShuffleScheduler.java   |   2 +-
 .../common/sort/impl/ExternalSorter.java        |  31 +++--
 .../common/sort/impl/PipelinedSorter.java       |   8 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   8 +-
 .../runtime/library/input/LocalMergedInput.java |   7 +-
 .../library/input/ShuffledMergedInput.java      |  22 +---
 .../library/input/ShuffledUnorderedKVInput.java |  13 +--
 .../library/output/LocalOnFileSorterOutput.java |   2 +-
 .../library/output/OnFileSortedOutput.java      |   6 +-
 .../library/output/OnFileUnorderedKVOutput.java |   6 +-
 .../input/TestBroadcastInputManager.java        |  12 +-
 .../java/org/apache/tez/test/TestInput.java     |   8 +-
 .../java/org/apache/tez/test/TestOutput.java    |   8 +-
 23 files changed, 333 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index d91a863..405e0d8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.api;
 
 import java.util.List;
+import com.google.common.collect.Lists;
 
 /**
  * A LogicalInput that is used to merge the data from multiple inputs and provide a 
@@ -45,10 +46,14 @@ public abstract class MergedLogicalInput implements LogicalInput {
 
   @Override
   public List<Event> start() throws Exception {
+    List<Event> events = Lists.newLinkedList();
     for (Input input : inputs) {
-      input.start();
+      List<Event> inputEvents = input.start();
+      if (inputEvents != null) {
+        events.addAll(inputEvents);
+      }
     }
-    return null;
+    return events;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
index 70ef41d..6a7154c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
@@ -147,10 +147,23 @@ public interface TezTaskContext {
    * This method can be called only once by any component. Calling it multiple
    * times from within the same component will result in an error.
    * 
+   * Each Input / Output must request memory. For Inputs / Outputs which do not
+   * have a specific ask, a null callback handler can be specified with a
+   * request size of 0.
+   * 
    * @param size
    *          request size in bytes.
    * @param callbackHandler
    *          the callback handler to be invoked once memory is assigned
    */
   public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler);
+  
+  /**
+   * Gets the total memory available to all components of the running task. This
+   * values will always be constant, and does not factor in any allocations.
+   * 
+   * @return the total available memory for all components of the task
+   */
+  public long getTotalMemoryAvailableToTask();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index f8f99b7..7108d37 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,6 +18,7 @@
 package org.apache.tez.mapreduce.input;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -120,8 +121,7 @@ public class MRInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    inputContext.requestInitialMemory(0l, null);
+    this.inputContext.requestInitialMemory(0l, null); //mandatory call
     MRInputUserPayloadProto mrUserPayload =
       MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
@@ -160,8 +160,7 @@ public class MRInput implements LogicalInput {
   
   @Override
   public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+    return Collections.emptyList();
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 66d4566..2d7b48a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -90,8 +90,7 @@ public class MROutput implements LogicalOutput {
   public List<Event> initialize(TezOutputContext outputContext)
       throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    outputContext.requestInitialMemory(0l, null);
+    outputContext.requestInitialMemory(0l, null); //mandatory call
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
     nonTaskNumberFormat.setMinimumIntegerDigits(3);
@@ -193,7 +192,6 @@ public class MROutput implements LogicalOutput {
   
   @Override
   public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index c31e903..e15f6f2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -37,6 +37,8 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezTaskContext;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
+import com.google.common.base.Preconditions;
+
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
   private static final AtomicInteger ID_GEN = new AtomicInteger(10000);
@@ -148,15 +150,25 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
 
   @Override
   public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler) {
-    this.initialMemoryDistributor.requestMemory(size, new MemoryUpdateCallback() {
-      @Override
-      public void memoryAssigned(long assignedSize) {
-        // Filler for this patch.
-        // TODO TEZ-815 implement this properly.
-      }
-    }, this, descriptor);
+    // Nulls allowed since all IOs have to make this call.
+    if (callbackHandler == null) {
+      Preconditions.checkArgument(size == 0,
+          "A Null callback handler can only be used with a request size of 0");
+      callbackHandler = new MemoryUpdateCallback() {
+        @Override
+        public void memoryAssigned(long assignedSize) {
+          
+        }
+      };
+    }
+    this.initialMemoryDistributor.requestMemory(size, callbackHandler, this, this.descriptor);
   }
 
+  @Override
+  public long getTotalMemoryAvailableToTask() {
+    return Runtime.getRuntime().maxMemory();
+  }
+  
   protected void signalFatalError(Throwable t, String message,
       EventMetaData sourceInfo) {
     runtimeTask.setFatalError(t, message);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index 6c5a3da..4af4404 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -36,25 +36,43 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
 import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
+@Private
 public class BroadcastInputManager implements FetchedInputAllocator,
     FetchedInputCallback {
 
   private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
   
   private final Configuration conf;
+  private final String uniqueIdentifier;
 
-  private final TezTaskOutputFiles fileNameAllocator;
-  private final LocalDirAllocator localDirAllocator;
+  private TezTaskOutputFiles fileNameAllocator;
+  private LocalDirAllocator localDirAllocator;
 
   // Configuration parameters
-  private final long memoryLimit;
-  private final long maxSingleShuffleLimit;
+  private long memoryLimit;
+  private long maxSingleShuffleLimit;
 
   private volatile long usedMemory = 0;
+  
+  private long maxAvailableTaskMemory;
+  private long initialMemoryAvailable =-1l;
 
-  public BroadcastInputManager(String uniqueIdentifier, Configuration conf, TezInputContext inputContext) {
-    this.conf = conf;
+  public BroadcastInputManager(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
+    this.conf = conf;    
+    this.uniqueIdentifier = uniqueIdentifier;
+    this.maxAvailableTaskMemory = maxTaskAvailableMemory;
+  }
 
+  @Private
+  void configureAndStart() {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial memory must be configured before starting");
     this.fileNameAllocator = new TezTaskOutputFiles(conf,
         uniqueIdentifier);
     this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -71,13 +89,14 @@ public class BroadcastInputManager implements FetchedInputAllocator,
 
     // Allow unit tests to fix Runtime memory
     long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
     
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    inputContext.requestInitialMemory(memReq, null);
-    long memAlloc = memReq;
-    
-    this.memoryLimit = memAlloc;
+    if (memReq <= this.initialMemoryAvailable) {
+      this.memoryLimit = memReq;
+    } else {
+      this.memoryLimit = initialMemoryAvailable;
+    }
+
     LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
 
     final float singleShuffleMemoryLimitPercent = conf.getFloat(
@@ -95,6 +114,26 @@ public class BroadcastInputManager implements FetchedInputAllocator,
     LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
     this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
   }
+  
+  @Private
+  static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+    return memReq;
+  }
+
+  @Private
+  void setInitialMemoryAvailable(long available) {
+    this.initialMemoryAvailable = available;
+  }
 
   @Override
   public synchronized FetchedInput allocate(long actualSize, long compressedSize,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index e4c31cf..bc17496 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -64,6 +65,7 @@ import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
 import org.apache.tez.runtime.library.shuffle.common.InputHost;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -72,51 +74,51 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public class BroadcastShuffleManager implements FetcherCallback {
+public class BroadcastShuffleManager implements FetcherCallback, MemoryUpdateCallback {
 
   private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
   
-  private TezInputContext inputContext;
-  private int numInputs;
-  private Configuration conf;
+  private final TezInputContext inputContext;
+  private final Configuration conf;
+  private final int numInputs;
   
-  private final BroadcastShuffleInputEventHandler inputEventHandler;
-  private final FetchedInputAllocator inputManager;
+  private BroadcastShuffleInputEventHandler inputEventHandler;
+  private FetchedInputAllocator inputManager;
   
-  private final ExecutorService fetcherRawExecutor;
-  private final ListeningExecutorService fetcherExecutor;
+  private ExecutorService fetcherRawExecutor;
+  private ListeningExecutorService fetcherExecutor;
 
-  private final ExecutorService schedulerRawExecutor;
-  private final ListeningExecutorService schedulerExecutor;
-  private final RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
+  private ExecutorService schedulerRawExecutor;
+  private ListeningExecutorService schedulerExecutor;
+  private RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
   
-  private final BlockingQueue<FetchedInput> completedInputs;
-  private final Set<InputIdentifier> completedInputSet;
-  private final ConcurrentMap<String, InputHost> knownSrcHosts;
-  private final BlockingQueue<InputHost> pendingHosts;
-  private final Set<InputAttemptIdentifier> obsoletedInputs;
+  private BlockingQueue<FetchedInput> completedInputs;
+  private Set<InputIdentifier> completedInputSet;
+  private ConcurrentMap<String, InputHost> knownSrcHosts;
+  private BlockingQueue<InputHost> pendingHosts;
+  private Set<InputAttemptIdentifier> obsoletedInputs;
   
-  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  private AtomicInteger numCompletedInputs = new AtomicInteger(0);
   
-  private final long startTime;
+  private long startTime;
   private long lastProgressTime;
 
   // Required to be held when manipulating pendingHosts
   private ReentrantLock lock = new ReentrantLock();
   private Condition wakeLoop = lock.newCondition();
   
-  private final int numFetchers;
-  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  private int numFetchers;
+  private AtomicInteger numRunningFetchers = new AtomicInteger(0);
   
   // Parameters required by Fetchers
-  private final SecretKey shuffleSecret;
-  private final int connectionTimeout;
-  private final int readTimeout;
-  private final CompressionCodec codec;
+  private SecretKey shuffleSecret;
+  private int connectionTimeout;
+  private int readTimeout;
+  private CompressionCodec codec;
   
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int ifileBufferSize;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  private int ifileBufferSize;
   
   private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
   
@@ -124,13 +126,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
   
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   
+  private volatile long initialMemoryAvailable = -1l;
+  
   // TODO NEWTEZ Add counters.
   
   public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
     this.numInputs = numInputs;
+    long initalMemReq = getInitialMemoryReq();
+    this.inputContext.requestInitialMemory(initalMemReq, this);
+  }
 
+  private void configureAndStart() throws IOException {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial memory available must be configured before starting");
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass = ConfigUtils
           .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -152,7 +162,10 @@ public class BroadcastShuffleManager implements FetcherCallback {
     this.ifileBufferSize = conf.getInt("io.file.buffer.size",
         TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
     
-    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf, inputContext);
+    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf,
+        inputContext.getTotalMemoryAvailableToTask());
+    ((BroadcastInputManager)this.inputManager).setInitialMemoryAvailable(initialMemoryAvailable);
+    ((BroadcastInputManager)this.inputManager).configureAndStart();
     this.inputEventHandler = new BroadcastShuffleInputEventHandler(
         inputContext, this, this.inputManager, codec, ifileReadAhead,
         ifileReadAheadLength);
@@ -208,7 +221,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
         .getName()) + ", numFetchers: " + numFetchers);
   }
   
-  public void run() {
+  private long getInitialMemoryReq() {
+    return BroadcastInputManager.getInitialMemoryReq(conf,
+        inputContext.getTotalMemoryAvailableToTask());
+  }
+  
+  public void setInitialMemoryAvailable(long available) {
+    this.initialMemoryAvailable = available;
+  }
+
+  public void run() throws IOException {
+    configureAndStart();
     ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
     Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
     // Shutdown this executor once this task, and the callback complete.
@@ -673,4 +696,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
       }
     }
   }
+
+  @Override
+  public void memoryAssigned(long assignedSize) {
+    this.initialMemoryAvailable = assignedSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 37808c3..051806c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -59,6 +59,12 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 
+import com.google.common.base.Preconditions;
+
+/**
+ * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
+ *
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @SuppressWarnings(value={"rawtypes"})
@@ -77,24 +83,26 @@ public class MergeManager {
   
   Set<MapOutput> inMemoryMergedMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final IntermediateMemoryToMemoryMerger memToMemMerger;
+  private IntermediateMemoryToMemoryMerger memToMemMerger;
 
   Set<MapOutput> inMemoryMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final InMemoryMerger inMemoryMerger;
+  private InMemoryMerger inMemoryMerger;
   
   Set<Path> onDiskMapOutputs = new TreeSet<Path>();
-  private final OnDiskMerger onDiskMerger;
+  private OnDiskMerger onDiskMerger;
   
-  private final long memoryLimit;
+  private  long memoryLimit;
+  private int postMergeMemLimit;
   private long usedMemory;
   private long commitMemory;
-  private final long maxSingleShuffleLimit;
+  private int ioSortFactor;
+  private long maxSingleShuffleLimit;
   
-  private final int memToMemMergeOutputsThreshold; 
-  private final long mergeThreshold;
+  private int memToMemMergeOutputsThreshold; 
+  private long mergeThreshold;
   
-  private final int ioSortFactor;
+  private long initialMemoryAvailable = -1;
 
   private final ExceptionReporter exceptionReporter;
   
@@ -106,16 +114,18 @@ public class MergeManager {
 
   private final TezCounter mergedMapOutputsCounter;
   
-  private final CompressionCodec codec;
+  private CompressionCodec codec;
   
   private volatile boolean finalMergeComplete = false;
   
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  private final int ifileBufferSize;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  private int ifileBufferSize;
 
-  private final int postMergeMemLimit;
-  
+
+  /**
+   * Construct the MergeManager. Must call start before it becomes usable.
+   */
   public MergeManager(Configuration conf, 
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
@@ -140,6 +150,16 @@ public class MergeManager {
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
 
+  }
+  
+  void setInitialMemoryAvailable(long available) {
+    this.initialMemoryAvailable = available;
+  }
+  
+  @Private
+  void configureAndStart() {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial available memory must be configured before starting");
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
           ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -160,6 +180,7 @@ public class MergeManager {
     this.ifileBufferSize = conf.getInt("io.file.buffer.size",
         TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
 
+    // Figure out initial memory req start
     final float maxInMemCopyUse =
       conf.getFloat(
           TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 
@@ -172,9 +193,7 @@ public class MergeManager {
 
     // Allow unit tests to fix Runtime memory
     long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
-    
-    LOG.info("Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+        Math.min(inputContext.getTotalMemoryAvailableToTask(), Integer.MAX_VALUE)) * maxInMemCopyUse);
 
     float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
         TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
@@ -182,29 +201,25 @@ public class MergeManager {
       throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
     }
     // TODO maxRedBuffer should be a long.
-    int maxRedBuffer = (int) Math.min(Runtime.getRuntime().maxMemory() * maxRedPer,
+    int maxRedBuffer = (int) Math.min(inputContext.getTotalMemoryAvailableToTask() * maxRedPer,
         Integer.MAX_VALUE);
-    LOG.info("Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
-
-    long reqMem = Math.max(maxRedBuffer, memLimit);
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    inputContext.requestInitialMemory(reqMem, null);
-    long availableMem = reqMem;
+    // Figure out initial memory req end
     
-    if (availableMem < memLimit) {
-      this.memoryLimit = availableMem;
+    if (this.initialMemoryAvailable < memLimit) {
+      this.memoryLimit = this.initialMemoryAvailable;
     } else {
       this.memoryLimit = memLimit;
     }
-    
-    if (availableMem < maxRedBuffer) {
-      this.postMergeMemLimit = (int) availableMem;
+
+    if (this.initialMemoryAvailable < maxRedBuffer) {
+      this.postMergeMemLimit = (int) this.initialMemoryAvailable;
     } else {
       this.postMergeMemLimit = maxRedBuffer;
     }
-    
-    LOG.info("FinalMemoryAllocation: ShuffleMemory=" + this.memoryLimit + ", postMergeMem: "
-        + this.postMergeMemLimit);
+
+    LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
+        + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
+        + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
 
     this.ioSortFactor = 
         conf.getInt(
@@ -265,6 +280,41 @@ public class MergeManager {
     this.onDiskMerger = new OnDiskMerger(this);
     this.onDiskMerger.start();
   }
+  
+  /**
+   * Exposing this to get an initial memory ask without instantiating the object.
+   */
+  @Private
+  static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    final float maxInMemCopyUse =
+        conf.getFloat(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+      if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+        throw new IllegalArgumentException("Invalid value for " +
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
+            maxInMemCopyUse);
+      }
+
+      // Allow unit tests to fix Runtime memory
+      long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+          Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
+      
+      LOG.info("Initial Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+
+      float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+      if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+        throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
+      }
+      // TODO maxRedBuffer should be a long.
+      int maxRedBuffer = (int) Math.min(maxAvailableTaskMemory * maxRedPer,
+          Integer.MAX_VALUE);
+      LOG.info("Initial Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+
+      long reqMem = Math.max(maxRedBuffer, memLimit);
+      return reqMem;
+  }
 
   public void waitForInMemoryMerge() throws InterruptedException {
     inMemoryMerger.waitForMerge();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 80f6627..1959533 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -42,6 +42,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -51,38 +52,53 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
 
+/**
+ * Usage: Create instance, setInitialMemoryAllocated(long), run()
+ *
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter {
+public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
   
   private static final Log LOG = LogFactory.getLog(Shuffle.class);
   private static final int PROGRESS_FREQUENCY = 2000;
   
   private final Configuration conf;
   private final TezInputContext inputContext;
-  private final ShuffleClientMetrics metrics;
+  private final int numInputs;
+  
+  private ShuffleClientMetrics metrics;
 
-  private final ShuffleInputEventHandler eventHandler;
-  private final ShuffleScheduler scheduler;
-  private final MergeManager merger;
+  private ShuffleInputEventHandler eventHandler;
+  private ShuffleScheduler scheduler;
+  private MergeManager merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
-  private final int numInputs;
-  private final SecretKey jobTokenSecret;
-  private final CompressionCodec codec;
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
+  
+  private SecretKey jobTokenSecret;
+  private CompressionCodec codec;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  
+  private volatile long initialMemoryAvailable = -1;
 
   private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
 
   public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
+    this.numInputs = numInputs;
+    long initialMemRequested = MergeManager.getInitialMemoryRequirement(conf,
+        inputContext.getTotalMemoryAvailableToTask());
+    inputContext.requestInitialMemory(initialMemRequested, this);
+  }
+
+  private void configureAndStart() throws IOException {
+    Preconditions.checkState(initialMemoryAvailable != -1,
+        "Initial Available memory must be configured before starting");
     this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
         inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-            
-    this.numInputs = numInputs;
     
     this.jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
@@ -151,6 +167,8 @@ public class Shuffle implements ExceptionReporter {
           reduceCombineInputCounter,
           mergedMapOutputsCounter,
           this);
+    merger.setInitialMemoryAvailable(initialMemoryAvailable);
+    merger.configureAndStart();
   }
 
   public void handleEvents(List<Event> events) {
@@ -195,7 +213,8 @@ public class Shuffle implements ExceptionReporter {
     return kvIter;
   }
 
-  public void run() {
+  public void run() throws IOException {
+    configureAndStart();
     RunShuffleCallable runShuffle = new RunShuffleCallable();
     runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
     new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
@@ -276,4 +295,9 @@ public class Shuffle implements ExceptionReporter {
     }
   }
 
+  @Override
+  public void memoryAssigned(long assignedSize) {
+    this.initialMemoryAvailable = assignedSize;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 3319752..2dcabe1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -64,8 +64,6 @@ public class ShuffleInputEventHandler {
   }
 
   private void processDataMovementEvent(DataMovementEvent dmEvent) {
-    // FIXME TODO NEWTEZ
-    // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
     DataMovementEventPayloadProto shufflePayload;
     try {
       shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index db4c794..3ff7d6b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -113,9 +113,9 @@ class ShuffleScheduler {
     this.failedShuffleCounter = failedShuffleCounter;
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
-    referee.start();
     this.maxFailedUniqueFetches = Math.min(numberOfInputs,
         this.maxFailedUniqueFetches);
+    referee.start();
     this.maxFetchFailuresBeforeReporting = 
         conf.getInt(
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES, 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index e8a78fa..78e55c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -53,7 +54,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter {
+public abstract class ExternalSorter implements MemoryUpdateCallback {
 
   private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
 
@@ -81,8 +82,8 @@ public abstract class ExternalSorter {
   protected boolean ifileReadAhead;
   protected int ifileReadAheadLength;
   protected int ifileBufferSize;
-  
-  protected int availableMemory;
+
+  protected volatile int availableMemoryMb;
 
   protected IndexedSorter sorter;
 
@@ -96,6 +97,7 @@ public abstract class ExternalSorter {
   protected TezCounter fileOutputByteCounter;
   protected TezCounter spilledRecordsCounter;
 
+  @Private
   public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
     this.outputContext = outputContext;
     this.conf = conf;
@@ -108,14 +110,9 @@ public abstract class ExternalSorter {
             TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
     long reqBytes = reqMemory << 20;
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    outputContext.requestInitialMemory(reqBytes, null);
-    long availBytes = reqBytes;
-    
-    this.availableMemory = (int) (availBytes >> 20);
-    
-    LOG.info("io.sort.mb requested: " + reqMemory + ", Allocated: " + availableMemory);
-    
+    outputContext.requestInitialMemory(reqBytes, this);
+    LOG.info("Requested SortBufferSize (io.sort.mb): " + reqMemory);
+
     // sorter
     sorter = ReflectionUtils.newInstance(this.conf.getClass(
         TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, QuickSort.class,
@@ -170,6 +167,13 @@ public abstract class ExternalSorter {
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
   }
+  
+  /**
+   * Used to start the actual Output. Typically, this involves allocating
+   * buffers, starting required threads, etc
+   */
+  @Private
+  public abstract void start() throws Exception;
 
   /**
    * Exception indicating that the allocated sort buffer is insufficient to hold
@@ -224,4 +228,9 @@ public abstract class ExternalSorter {
   public ShuffleHeader getShuffleHeader(int reduce) {
     throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
   }
+  
+  @Override
+  public void memoryAssigned(long assignedSize) {
+    this.availableMemoryMb = (int) (assignedSize >> 20);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 3237522..6bfa098 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -34,7 +34,6 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -44,7 +43,6 @@ import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
@@ -92,8 +90,8 @@ public class PipelinedSorter extends ExternalSorter {
   private int totalIndexCacheMemory;
   private int indexCacheMemoryLimit;
 
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
-    super.initialize(outputContext, conf, numOutputs);
+  @Override
+  public void start() throws IOException {
     
     partitionBits = bitcount(partitions)+1;
    
@@ -102,7 +100,7 @@ public class PipelinedSorter extends ExternalSorter {
       this.conf.getFloat(
           TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT, 
           TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
-    final int sortmb = this.availableMemory;
+    final int sortmb = this.availableMemoryMb;
     indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     if (spillper > (float)1.0 || spillper <= (float)0.0) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index e831dec..9079473 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -40,7 +39,6 @@ import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -111,14 +109,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   private int indexCacheMemoryLimit;
 
   @Override
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { 
-    super.initialize(outputContext, conf, numOutputs);
-    
+  public void start() throws IOException {
     // sanity checks
     final float spillper = this.conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
         TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
-    final int sortmb = this.availableMemory;
+    final int sortmb = this.availableMemoryMb;
     if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
       throw new IOException("Invalid \""
           + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT + "\": " + spillper);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index b8191db..4d2441c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -36,7 +36,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
-    this.inputContext.requestInitialMemory(0l, null); // mandatory call. Fix in TEZ-815
+    this.inputContext.requestInitialMemory(0l, null); // mandatory call.
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
     if (numInputs == 0) {
@@ -48,6 +48,11 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
     createValuesIterator();
     return Collections.emptyList();
   }
+  
+  @Override
+  public List<Event> start() throws IOException {
+    return Collections.emptyList();
+  }
 
   @Override
   public List<Event> close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 9b9cfaf..bee7e45 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -38,6 +38,7 @@ import org.apache.tez.runtime.library.common.ValuesIterator;
 import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
+
 /**
  * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
  * intermediate sorted data, merges them and provides key/<values> to the
@@ -78,17 +79,15 @@ public class ShuffledMergedInput implements LogicalInput {
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
         inputContext.getWorkDirs());
 
-    // Start the shuffle - copy and merge.
     shuffle = new Shuffle(inputContext, this.conf, numInputs);
-    shuffle.run();
-
     return Collections.emptyList();
   }
 
   @Override
-  public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+  public List<Event> start() throws IOException {
+    // Start the shuffle - copy and merge
+    shuffle.run();
+    return Collections.emptyList();
   }
 
   /**
@@ -207,15 +206,4 @@ public class ShuffledMergedInput implements LogicalInput {
 
   }
 
-  // This functionality is currently broken. If there's inputs which need to be
-  // written to disk, there's a possibility that inputs from the different
-  // sources could clobber each others' output. Also the current structures do
-  // not have adequate information to de-dupe these (vertex name)
-//  public void mergeWith(ShuffledMergedInput other) {
-//    this.numInputs += other.getNumPhysicalInputs();
-//  }
-//
-//  public int getNumPhysicalInputs() {
-//    return this.numInputs;
-//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 8535c58..61dff6f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.input;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -34,7 +35,6 @@ import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
 import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
 
 import com.google.common.base.Preconditions;
-
 public class ShuffledUnorderedKVInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
@@ -60,15 +60,14 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
 
     this.shuffleManager = new BroadcastShuffleManager(inputContext, conf,
         numInputs);
-    this.shuffleManager.run();
-    this.kvReader = this.shuffleManager.createReader();
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+  public List<Event> start() throws IOException {
+    this.shuffleManager.run();
+    this.kvReader = this.shuffleManager.createReader();
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index c8a8233..d7e017a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -32,7 +32,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
 
   private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
 
-  
+
 
   @Override
   public List<Event> close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index c9a12b2..38f6bf9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -80,9 +80,9 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
-  public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+  public List<Event> start() throws Exception {
+    sorter.start();
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index eb80940..04d638f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -69,8 +69,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
         outputContext.getWorkDirs());
 
-    // TEZ 815. Fix this. Not used until there's a separation between init and start.
-    this.outputContext.requestInitialMemory(0l, null);
+    this.outputContext.requestInitialMemory(0l, null); // mandatory call
 
     this.dataViaEventsEnabled = conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED,
@@ -89,8 +88,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
 
   @Override
   public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
index b502b1d..f07c5ac 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -19,7 +19,6 @@
 package org.apache.tez.runtime.library.broadcast.input;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -28,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.junit.Test;
@@ -52,12 +50,12 @@ public class TestBroadcastInputManager {
     
     long inMemThreshold = (long) (bufferPercent * jvmMax);
     LOG.info("InMemThreshold: " + inMemThreshold);
-    
-    TezInputContext mockInputContext = mock(TezInputContext.class);
+
     BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(),
-        conf, mockInputContext);
-    
-    
+        conf, Runtime.getRuntime().maxMemory());
+    inputManager.setInitialMemoryAvailable(inMemThreshold);
+    inputManager.configureAndStart();
+
     long requestSize = (long) (0.4f * inMemThreshold);
     long compressedSize = 1l;
     LOG.info("RequestSize: " + requestSize);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 3063ec2..5ed9faa 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.test;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -188,7 +189,7 @@ public class TestInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     this.inputContext = inputContext;
-    this.inputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
+    this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
     if (inputContext.getUserPayload() != null) {
       String vName = inputContext.getTaskVertexName();
       conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
@@ -220,13 +221,12 @@ public class TestInput implements LogicalInput {
         }
       }
     }
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
   public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3a63d9b1/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 5f2edae..ddd2a29 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.test;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -46,14 +47,13 @@ public class TestOutput implements LogicalOutput {
   public List<Event> initialize(TezOutputContext outputContext)
       throws Exception {
     this.outputContext = outputContext;
-    this.outputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
-    return null;
+    this.outputContext.requestInitialMemory(0l, null); //Mandatory call
+    return Collections.emptyList();
   }
 
   @Override
   public List<Event> start() {
-    // TODO TEZ-815 To be fixed in a subsequent jira if required.
-    return null;
+    return Collections.emptyList();
   }
 
   @Override