You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/25 03:25:37 UTC

[1/2] TEZ-879. Fix potential synchronization issues in runtime components. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 908512fc3 -> 0bf327c28


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
index d8682f3..8fadd89 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -36,7 +36,6 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
 import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Usage: Create instance, setInitialMemoryAvailable(long), configureAndStart()
@@ -49,34 +48,29 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
   private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
   
   private final Configuration conf;
-  private final String uniqueIdentifier;
 
-  private TezTaskOutputFiles fileNameAllocator;
-  private LocalDirAllocator localDirAllocator;
+  private final TezTaskOutputFiles fileNameAllocator;
+  private final LocalDirAllocator localDirAllocator;
 
   // Configuration parameters
-  private long memoryLimit;
-  private long maxSingleShuffleLimit;
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
 
-  private volatile long usedMemory = 0;
+  private final long maxAvailableTaskMemory;
+  private final long initialMemoryAvailable;
   
-  private long maxAvailableTaskMemory;
-  private long initialMemoryAvailable =-1l;
+  private volatile long usedMemory = 0;
 
-  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory) {
+  public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf,
+      long maxTaskAvailableMemory, long memoryAvailable) {
     this.conf = conf;    
-    this.uniqueIdentifier = uniqueIdentifier;
     this.maxAvailableTaskMemory = maxTaskAvailableMemory;
-  }
-
-  @Private
-  public void configureAndStart() {
-    Preconditions.checkState(initialMemoryAvailable != -1,
-        "Initial memory must be configured before starting");
+    this.initialMemoryAvailable = memoryAvailable;
+    
     this.fileNameAllocator = new TezTaskOutputFiles(conf,
         uniqueIdentifier);
     this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
+    
     // Setup configuration
     final float maxInMemCopyUse = conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
@@ -86,8 +80,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
           + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
           + maxInMemCopyUse);
     }
-
-    // Allow unit tests to fix Runtime memory
+    
     long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
         Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
     
@@ -98,7 +91,7 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
     }
 
     LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
-
+    
     final float singleShuffleMemoryLimitPercent = conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
         TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
@@ -111,10 +104,10 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
 
     this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
     
-    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + 
-    this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
+    LOG.info("SimpleInputManager -> " + "MemoryLimit: " + 
+        this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
   }
-  
+
   @Private
   public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
     final float maxInMemCopyUse = conf.getFloat(
@@ -130,11 +123,6 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
     return memReq;
   }
 
-  @Private
-  public void setInitialMemoryAvailable(long available) {
-    this.initialMemoryAvailable = available;
-  }
-
   @Override
   public synchronized FetchedInput allocate(long actualSize, long compressedSize,
       InputAttemptIdentifier inputAttemptIdentifier) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
index 3ec839d..15bd05a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/shuffle/common/impl/TestSimpleFetchedInputAllocator.java
@@ -53,9 +53,7 @@ public class TestSimpleFetchedInputAllocator {
     LOG.info("InMemThreshold: " + inMemThreshold);
 
     SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator(UUID.randomUUID().toString(),
-        conf, Runtime.getRuntime().maxMemory());
-    inputManager.setInitialMemoryAvailable(inMemThreshold);
-    inputManager.configureAndStart();
+        conf, Runtime.getRuntime().maxMemory(), inMemThreshold);
 
     long requestSize = (long) (0.4f * inMemThreshold);
     long compressedSize = 1l;


[2/2] git commit: TEZ-879. Fix potential synchronization issues in runtime components. (sseth)

Posted by ss...@apache.org.
TEZ-879. Fix potential synchronization issues in runtime components.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0bf327c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0bf327c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0bf327c2

Branch: refs/heads/master
Commit: 0bf327c288274b0fde677bdfc373e6110f7b86e2
Parents: 908512f
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 19:24:59 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 19:24:59 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/TezUtils.java    |  21 +++
 .../org/apache/tez/common/TestTezUtils.java     |  11 ++
 .../runtime/api/impl/TezCountersDelegate.java   |  16 +-
 .../broadcast/output/FileBasedKVWriter.java     |  38 +++--
 .../library/common/InputAttemptIdentifier.java  |   2 +-
 .../common/MemoryUpdateCallbackHandler.java     |  43 ++++++
 .../library/common/shuffle/impl/Fetcher.java    |   3 +-
 .../common/shuffle/impl/MergeManager.java       | 125 +++++++--------
 .../common/shuffle/impl/MergeThread.java        |   6 +-
 .../library/common/shuffle/impl/Shuffle.java    |  57 ++++---
 .../common/shuffle/impl/ShuffleScheduler.java   |  10 +-
 .../common/sort/impl/ExternalSorter.java        | 112 +++++++-------
 .../common/sort/impl/PipelinedSorter.java       |  35 +++--
 .../common/sort/impl/dflt/DefaultSorter.java    |  23 +--
 .../library/input/ShuffledMergedInput.java      |  74 ++++-----
 .../library/input/ShuffledUnorderedKVInput.java | 152 +++++++++----------
 .../library/output/OnFileSortedOutput.java      |  56 ++++---
 .../shuffle/common/impl/ShuffleManager.java     | 100 +++++-------
 .../impl/SimpleFetchedInputAllocator.java       |  46 +++---
 .../impl/TestSimpleFetchedInputAllocator.java   |   4 +-
 20 files changed, 479 insertions(+), 455 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index c7f2faf..2979d5f 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -27,6 +27,8 @@ import java.io.PrintStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
@@ -36,6 +38,7 @@ import java.util.zip.InflaterInputStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
@@ -223,6 +226,7 @@ public class TezUtils {
     return output;
   }
 
+  @Private
   public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {
     ByteString.Output os = ByteString.newOutput();
     DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(
@@ -233,12 +237,29 @@ public class TezUtils {
     return byteString;
   }
   
+  @Private
   public static byte[] decompressByteStringToByteArray(ByteString byteString) throws IOException {
     InflaterInputStream in = new InflaterInputStream(byteString.newInput());
     byte[] bytes = IOUtils.toByteArray(in);
     return bytes;
   }
 
+  private static final Pattern pattern = Pattern.compile("\\W");
+  @Private
+  public static final int MAX_VERTEX_NAME_LENGTH = 40;
+
+  @Private
+  public static String cleanVertexName(String vertexName) {
+    return sanitizeString(vertexName).substring(0,
+        vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
+  }
+
+  private static String sanitizeString(String srcString) {
+    Matcher matcher = pattern.matcher(srcString);
+    String res = matcher.replaceAll("_");
+    return res; // Number starts allowed rightnow
+  }
+
   public static void updateLoggers(String addend) throws FileNotFoundException {
     String containerLogDir = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 2d132a1..ef4efe1 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -49,6 +49,17 @@ public class TestTezUtils {
     Assert.assertEquals(conf.size(), 6);
     checkConf(conf);
   }
+  
+  @Test
+  public void testCleanVertexName() {
+    String testString = "special characters & spaces and longer than "
+        + TezUtils.MAX_VERTEX_NAME_LENGTH + " characters";
+    Assert.assertTrue(testString.length() > TezUtils.MAX_VERTEX_NAME_LENGTH);
+    String cleaned = TezUtils.cleanVertexName(testString);
+    Assert.assertTrue(cleaned.length() <= TezUtils.MAX_VERTEX_NAME_LENGTH);
+    Assert.assertFalse(cleaned.contains("\\s+"));
+    Assert.assertTrue(cleaned.matches("\\w+"));
+  }
 
   private Configuration getConf() {
     Configuration conf = new Configuration(false);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
index 29beac0..34692c6 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezCountersDelegate.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.api.impl;
 
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -30,8 +31,8 @@ public class TezCountersDelegate extends TezCounters {
   public TezCountersDelegate(TezCounters original, String taskVertexName, String edgeVertexName,
       String type) {
     this.original = original;
-    this.groupModifier = cleanVertexName(taskVertexName) + "_" + type + "_"
-        + cleanVertexName(edgeVertexName);
+    this.groupModifier = TezUtils.cleanVertexName(taskVertexName) + "_" + type + "_"
+        + TezUtils.cleanVertexName(edgeVertexName);
   }
 
   // Should only be called while setting up Inputs / Outputs - rather than being
@@ -44,15 +45,4 @@ public class TezCountersDelegate extends TezCounters {
     String modifiedGroupName = groupName + "_" + this.groupModifier;
     return original.findCounter(modifiedGroupName, counterName);
   }
-
-
-  private static String cleanVertexName(String vertexName) {
-    return sanitizeString(vertexName).substring(0,
-        vertexName.length() > 40 ? 40 : vertexName.length());
-  }
-
-  private static String sanitizeString(String srcString) {
-    String res = srcString.replaceAll("[^A-za-z0-9_]", "_"); 
-    return res; // Number starts allowed rightnow
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index c5e6cc1..880420a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -54,17 +54,17 @@ public class FileBasedKVWriter implements KeyValueWriter {
   private int numRecords = 0;
 
   @SuppressWarnings("rawtypes")
-  private Class keyClass;
+  private final Class keyClass;
   @SuppressWarnings("rawtypes")
-  private Class valClass;
-  private CompressionCodec codec;
-  private FileSystem rfs;
-  private IFile.Writer writer;
+  private final Class valClass;
+  private final CompressionCodec codec;
+  private final FileSystem rfs;
+  private final IFile.Writer writer;
 
-  private Path outputPath;
+  private final Path outputPath;
   private Path indexPath;
   
-  private TezTaskOutput ouputFileManager;
+  private final TezTaskOutput ouputFileManager;
   private boolean closed = false;
 
   // Number of output key-value pairs
@@ -110,7 +110,16 @@ public class FileBasedKVWriter implements KeyValueWriter {
     LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
         : codec.getClass().getName()));
 
-    initWriter();
+    this.outputPath = ouputFileManager.getOutputFileForWrite();
+    LOG.info("Writing data file: " + outputPath);
+
+    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+    // setting up an in-memory buffer which is occasionally flushed to disk so
+    // that the output does not block.
+
+    // TODO NEWTEZ maybe use appropriate counter
+    this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
+        codec, null, outputBytesCounter);
   }
 
   /**
@@ -141,19 +150,6 @@ public class FileBasedKVWriter implements KeyValueWriter {
     this.outputRecordsCounter.increment(1);
     numRecords++;
   }
-
-  public void initWriter() throws IOException {
-    this.outputPath = ouputFileManager.getOutputFileForWrite();
-    LOG.info("Writing data file: " + outputPath);
-
-    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
-    // setting up an in-memory buffer which is occasionally flushed to disk so
-    // that the output does not block.
-
-    // TODO NEWTEZ maybe use appropriate counter
-    this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
-        codec, null, outputBytesCounter);
-  }
   
   public long getRawLength() {
     Preconditions.checkState(closed, "Only available after the Writer has been closed");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 946cb0f..4c9d525 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -31,7 +31,7 @@ public class InputAttemptIdentifier {
   private final int attemptNumber;
   private String pathComponent;
   
-  public static String PATH_PREFIX = "attempt";
+  public static final String PATH_PREFIX = "attempt";
   
   public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
     this(new InputIdentifier(inputIndex), attemptNumber, null);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
new file mode 100644
index 0000000..b2dedee
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/MemoryUpdateCallbackHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common;
+
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+
+import com.google.common.base.Preconditions;
+
+public class MemoryUpdateCallbackHandler implements MemoryUpdateCallback {
+
+  private long assignedMemory;
+  private boolean updated = false;
+
+  @Override
+  public synchronized void memoryAssigned(long assignedSize) {
+    updated = true;
+    this.assignedMemory = assignedSize;
+  }
+
+  public synchronized long getMemoryAssigned() {
+    return this.assignedMemory;
+  }
+
+  public synchronized void validateUpdateReceived() {
+    Preconditions.checkState(updated == true, "Iniital memory update not received");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index db72e92..0433098 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -141,7 +142,7 @@ class Fetcher extends Thread {
     this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
 
-    setName("fetcher#" + id);
+    setName("fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id);
     setDaemon(true);
 
     synchronized (Fetcher.class) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index 50695c8..5a952ba 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -40,10 +40,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -60,7 +59,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Usage. Create instance. setInitialMemoryAvailable(long), configureAndStart()
@@ -82,28 +80,28 @@ public class MergeManager {
   private final Progressable nullProgressable = new NullProgressable();
   private final Combiner combiner;  
   
-  Set<MapOutput> inMemoryMergedMapOutputs = 
+  private final Set<MapOutput> inMemoryMergedMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private IntermediateMemoryToMemoryMerger memToMemMerger;
+  private final IntermediateMemoryToMemoryMerger memToMemMerger;
 
-  Set<MapOutput> inMemoryMapOutputs = 
+  private final Set<MapOutput> inMemoryMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private InMemoryMerger inMemoryMerger;
+  private final InMemoryMerger inMemoryMerger;
   
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
-  private OnDiskMerger onDiskMerger;
+  private final Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+  private final OnDiskMerger onDiskMerger;
   
-  private  long memoryLimit;
-  private int postMergeMemLimit;
+  private final long memoryLimit;
+  private final int postMergeMemLimit;
   private long usedMemory;
   private long commitMemory;
-  private int ioSortFactor;
-  private long maxSingleShuffleLimit;
+  private final int ioSortFactor;
+  private final long maxSingleShuffleLimit;
   
-  private int memToMemMergeOutputsThreshold; 
-  private long mergeThreshold;
+  private final int memToMemMergeOutputsThreshold; 
+  private final long mergeThreshold;
   
-  private long initialMemoryAvailable = -1;
+  private final long initialMemoryAvailable;
 
   private final ExceptionReporter exceptionReporter;
   
@@ -120,13 +118,13 @@ public class MergeManager {
   private final TezCounter additionalBytesWritten;
   private final TezCounter additionalBytesRead;
   
-  private CompressionCodec codec;
+  private final CompressionCodec codec;
   
   private volatile boolean finalMergeComplete = false;
   
-  private boolean ifileReadAhead;
-  private int ifileReadAheadLength;
-  private int ifileBufferSize;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  private final int ifileBufferSize;
 
 
   /**
@@ -140,11 +138,16 @@ public class MergeManager {
                       TezCounter spilledRecordsCounter,
                       TezCounter reduceCombineInputCounter,
                       TezCounter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter) {
+                      ExceptionReporter exceptionReporter,
+                      long initialMemoryAvailable,
+                      CompressionCodec codec,
+                      boolean ifileReadAheadEnabled,
+                      int ifileReadAheadLength) {
     this.inputContext = inputContext;
     this.conf = conf;
     this.localDirAllocator = localDirAllocator;
     this.exceptionReporter = exceptionReporter;
+    this.initialMemoryAvailable = initialMemoryAvailable;
     
     this.combiner = combiner;
 
@@ -161,36 +164,16 @@ public class MergeManager {
     this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
     this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
 
-  }
-  
-  void setInitialMemoryAvailable(long available) {
-    this.initialMemoryAvailable = available;
-  }
-  
-  @Private
-  void configureAndStart() {
-    Preconditions.checkState(initialMemoryAvailable != -1,
-        "Initial available memory must be configured before starting");
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
-    this.ifileReadAhead = conf.getBoolean(
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-        TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAheadEnabled;
     if (this.ifileReadAhead) {
-      this.ifileReadAheadLength = conf.getInt(
-          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
-          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+      this.ifileReadAheadLength = ifileReadAheadLength;
     } else {
       this.ifileReadAheadLength = 0;
     }
     this.ifileBufferSize = conf.getInt("io.file.buffer.size",
         TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-
+    
     // Figure out initial memory req start
     final float maxInMemCopyUse =
       conf.getFloat(
@@ -221,13 +204,13 @@ public class MergeManager {
     } else {
       this.memoryLimit = memLimit;
     }
-
+    
     if (this.initialMemoryAvailable < maxRedBuffer) {
       this.postMergeMemLimit = (int) this.initialMemoryAvailable;
     } else {
       this.postMergeMemLimit = maxRedBuffer;
     }
-
+    
     LOG.info("InitialRequest: ShuffleMem=" + memLimit + ", postMergeMem=" + maxRedBuffer
         + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + "Updated to: ShuffleMem="
         + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
@@ -236,7 +219,7 @@ public class MergeManager {
         conf.getInt(
             TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
-
+    
     final float singleShuffleMemoryLimitPercent =
         conf.getFloat(
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
@@ -264,34 +247,40 @@ public class MergeManager {
              "mergeThreshold=" + mergeThreshold + ", " + 
              "ioSortFactor=" + ioSortFactor + ", " +
              "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
+    
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
       throw new RuntimeException("Invlaid configuration: "
           + "maxSingleShuffleLimit should be less than mergeThreshold"
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "mergeThreshold: " + this.mergeThreshold);
     }
-
+    
     boolean allowMemToMemMerge = 
-      conf.getBoolean(
-          TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
-    if (allowMemToMemMerge) {
-      this.memToMemMerger = 
-        new IntermediateMemoryToMemoryMerger(this,
-                                             memToMemMergeOutputsThreshold);
-      this.memToMemMerger.start();
-    } else {
-      this.memToMemMerger = null;
+        conf.getBoolean(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+      if (allowMemToMemMerge) {
+        this.memToMemMerger = 
+          new IntermediateMemoryToMemoryMerger(this,
+                                               memToMemMergeOutputsThreshold);
+      } else {
+        this.memToMemMerger = null;
+      }
+      
+      this.inMemoryMerger = new InMemoryMerger(this);
+      
+      this.onDiskMerger = new OnDiskMerger(this);
+  }
+
+  @Private
+  void configureAndStart() {
+    if (this.memToMemMerger != null) {
+      memToMemMerger.start();
     }
-    
-    this.inMemoryMerger = new InMemoryMerger(this);
     this.inMemoryMerger.start();
-    
-    this.onDiskMerger = new OnDiskMerger(this);
     this.onDiskMerger.start();
   }
-  
+
   /**
    * Exposing this to get an initial memory ask without instantiating the object.
    */
@@ -492,7 +481,7 @@ public class MergeManager {
     public IntermediateMemoryToMemoryMerger(MergeManager manager, 
                                             int mergeFactor) {
       super(manager, mergeFactor, exceptionReporter);
-      setName("MemToMemMerger [" + inputContext.getSourceVertexName() + "]");
+      setName("MemToMemMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
       setDaemon(true);
     }
 
@@ -547,7 +536,7 @@ public class MergeManager {
     
     public InMemoryMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("MemtoDiskMerger [" + inputContext.getSourceVertexName() + "]");
+      setName("MemtoDiskMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
       setDaemon(true);
     }
     
@@ -646,7 +635,7 @@ public class MergeManager {
     
     public OnDiskMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("DiskToDiskMerger [" + inputContext.getSourceVertexName() + "]");
+      setName("DiskToDiskMerger [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
       setDaemon(true);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
index d8a7722..aed2628 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeThread.java
@@ -31,7 +31,7 @@ abstract class MergeThread<T> extends Thread {
   private static final Log LOG = LogFactory.getLog(MergeThread.class);
 
   private volatile boolean inProgress = false;
-  private List<T> inputs = new ArrayList<T>();
+  private final List<T> inputs = new ArrayList<T>();
   protected final MergeManager manager;
   private final ExceptionReporter reporter;
   private boolean closed = false;
@@ -56,8 +56,8 @@ abstract class MergeThread<T> extends Thread {
   
   public synchronized void startMerge(Set<T> inputs) {
     if (!closed) {
+      this.inputs.clear();
       inProgress = true;
-      this.inputs = new ArrayList<T>();
       Iterator<T> iter=inputs.iterator();
       for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
         this.inputs.add(iter.next());
@@ -95,7 +95,7 @@ abstract class MergeThread<T> extends Thread {
       } finally {
         synchronized (this) {
           // Clear inputs
-          inputs = null;
+          inputs.clear();
           inProgress = false;        
           notifyAll();
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 1bc3903..a27fa31 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -37,12 +38,12 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -58,7 +59,7 @@ import com.google.common.base.Preconditions;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
+public class Shuffle implements ExceptionReporter {
   
   private static final Log LOG = LogFactory.getLog(Shuffle.class);
   private static final int PROGRESS_FREQUENCY = 2000;
@@ -67,35 +68,28 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
   private final TezInputContext inputContext;
   private final int numInputs;
   
-  private ShuffleClientMetrics metrics;
+  private final ShuffleClientMetrics metrics;
 
-  private ShuffleInputEventHandler eventHandler;
-  private ShuffleScheduler scheduler;
-  private MergeManager merger;
+  private final ShuffleInputEventHandler eventHandler;
+  private final ShuffleScheduler scheduler;
+  private final MergeManager merger;
+
+  private final SecretKey jobTokenSecret;
+  private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  
   private Throwable throwable = null;
   private String throwingThreadName = null;
-  
-  private SecretKey jobTokenSecret;
-  private CompressionCodec codec;
-  private boolean ifileReadAhead;
-  private int ifileReadAheadLength;
-  
-  private volatile long initialMemoryAvailable = -1;
 
   private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
 
-  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
+      long initialMemoryAvailable) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
     this.numInputs = numInputs;
-    long initialMemRequested = MergeManager.getInitialMemoryRequirement(conf,
-        inputContext.getTotalMemoryAvailableToTask());
-    inputContext.requestInitialMemory(initialMemRequested, this);
-  }
 
-  private void configureAndStart() throws IOException {
-    Preconditions.checkState(initialMemoryAvailable != -1,
-        "Initial Available memory must be configured before starting");
     this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
         inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
@@ -175,9 +169,11 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
           spilledRecordsCounter,
           reduceCombineInputCounter,
           mergedMapOutputsCounter,
-          this);
-    merger.setInitialMemoryAvailable(initialMemoryAvailable);
-    merger.configureAndStart();
+          this,
+          initialMemoryAvailable,
+          codec,
+          ifileReadAhead,
+          ifileReadAheadLength);
   }
 
   public void handleEvents(List<Event> events) {
@@ -225,10 +221,11 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
   }
 
   public void run() throws IOException {
-    configureAndStart();
+    merger.configureAndStart();
     RunShuffleCallable runShuffle = new RunShuffleCallable();
     runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
-    new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
+    new Thread(runShuffleFuture, "ShuffleMergeRunner ["
+        + TezUtils.cleanVertexName(inputContext.getSourceVertexName() + "]")).start();
   }
   
   private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
@@ -294,6 +291,7 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
     }
   }
   
+  @Private
   public synchronized void reportException(Throwable t) {
     if (throwable == null) {
       throwable = t;
@@ -314,9 +312,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
     }
   }
 
-  @Override
-  public void memoryAssigned(long assignedSize) {
-    this.initialMemoryAvailable = assignedSize;
+  @Private
+  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index d53c6a6..a5b79fb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
@@ -73,7 +74,7 @@ class ShuffleScheduler {
   
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
-  private final Referee referee = new Referee();
+  private final Referee referee;
   private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
     new HashMap<InputAttemptIdentifier,IntWritable>(); 
   private final Map<String,IntWritable> hostFailures = 
@@ -116,6 +117,7 @@ class ShuffleScheduler {
     abortFailureLimit = Math.max(30, numberOfInputs / 10);
     remainingMaps = numberOfInputs;
     finishedMaps = new boolean[remainingMaps]; // default init to false
+    this.referee = new Referee();
     this.shuffle = shuffle;
     this.shuffledInputsCounter = shuffledInputsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
@@ -159,8 +161,7 @@ class ShuffleScheduler {
                                          long milis,
                                          MapOutput output
                                          ) throws IOException {
-    String taskIdentifier = TezRuntimeUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getInputIndex(), srcAttemptIdentifier.getAttemptNumber());
-    failureCounts.remove(taskIdentifier);
+    failureCounts.remove(srcAttemptIdentifier);
     if (host != null) {
       hostFailures.remove(host.getHostName());
     }
@@ -530,7 +531,8 @@ class ShuffleScheduler {
    */
   private class Referee extends Thread {
     public Referee() {
-      setName("ShufflePenaltyReferee");
+      setName("ShufflePenaltyReferee ["
+          + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]");
       setDaemon(true);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index ab8a869..67e7337 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -56,7 +55,7 @@ import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
 import com.google.common.base.Preconditions;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public abstract class ExternalSorter implements MemoryUpdateCallback {
+public abstract class ExternalSorter {
 
   private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
 
@@ -65,77 +64,80 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
   public abstract void flush() throws IOException;
 
   public abstract void write(Object key, Object value) throws IOException;
+
+  protected final Progressable nullProgressable = new NullProgressable();
+  protected final TezOutputContext outputContext;
+  protected final Combiner combiner;
+  protected final Partitioner partitioner;
+  protected final Configuration conf;
+  protected final FileSystem rfs;
+  protected final TezTaskOutput mapOutputFile;
+  protected final int partitions;
+  protected final Class keyClass;
+  protected final Class valClass;
+  protected final RawComparator comparator;
+  protected final SerializationFactory serializationFactory;
+  protected final Serializer keySerializer;
+  protected final Serializer valSerializer;
   
-  private int initialMemRequestMb;
-  protected Progressable nullProgressable = new NullProgressable();
-  protected TezOutputContext outputContext;
-  protected Combiner combiner;
-  protected Partitioner partitioner;
-  protected Configuration conf;
-  protected FileSystem rfs;
-  protected TezTaskOutput mapOutputFile;
-  protected int partitions;
-  protected Class keyClass;
-  protected Class valClass;
-  protected RawComparator comparator;
-  protected SerializationFactory serializationFactory;
-  protected Serializer keySerializer;
-  protected Serializer valSerializer;
-  
-  protected boolean ifileReadAhead;
-  protected int ifileReadAheadLength;
-  protected int ifileBufferSize;
+  protected final boolean ifileReadAhead;
+  protected final int ifileReadAheadLength;
+  protected final int ifileBufferSize;
 
-  protected volatile int availableMemoryMb;
+  protected final int availableMemoryMb;
 
-  protected IndexedSorter sorter;
+  protected final IndexedSorter sorter;
 
   // Compression for map-outputs
-  protected CompressionCodec codec;
+  protected final CompressionCodec codec;
 
   // Counters
   // MR compatilbity layer needs to rename counters back to what MR requries.
 
   // Represents final deserialized size of output (spills are not counted)
-  protected TezCounter mapOutputByteCounter;
+  protected final TezCounter mapOutputByteCounter;
   // Represents final number of records written (spills are not counted)
-  protected TezCounter mapOutputRecordCounter;
+  protected final TezCounter mapOutputRecordCounter;
   // Represents the size of the final output - with any overheads introduced by
   // the storage/serialization mechanism. This is an uncompressed data size.
-  protected TezCounter outputBytesWithOverheadCounter;
+  protected final TezCounter outputBytesWithOverheadCounter;
   // Represents the size of the final output - which will be transmitted over
   // the wire (spills are not counted). Factors in compression if it is enabled.
-  protected TezCounter fileOutputByteCounter;
+  protected final TezCounter fileOutputByteCounter;
   // Represents total number of records written to disk (includes spills. Min
   // value for this is equal to number of output records)
-  protected TezCounter spilledRecordsCounter;
+  protected final TezCounter spilledRecordsCounter;
   // Bytes written as a result of additional spills. The single spill for the
   // final output data is not considered. (This will be 0 if there's no
   // additional spills. Compressed size - so may not represent the size in the
   // sort buffer)
-  protected TezCounter additionalSpillBytesWritten;
+  protected final TezCounter additionalSpillBytesWritten;
   
-  protected TezCounter additionalSpillBytesRead;
+  protected final TezCounter additionalSpillBytesRead;
   // Number of additional spills. (This will be 0 if there's no additional
   // spills)
-  protected TezCounter numAdditionalSpills;
+  protected final TezCounter numAdditionalSpills;
 
-  @Private
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+  public ExternalSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+      long initialMemoryAvailable) throws IOException {
     this.outputContext = outputContext;
     this.conf = conf;
     this.partitions = numOutputs;
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
-    initialMemRequestMb = 
-        this.conf.getInt(
-            TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
-    Preconditions.checkArgument(initialMemRequestMb != 0, "io.sort.mb should be larger than 0");
-    long reqBytes = initialMemRequestMb << 20;
-    outputContext.requestInitialMemory(reqBytes, this);
-    LOG.info("Requested SortBufferSize (io.sort.mb): " + initialMemRequestMb);
+    int assignedMb = (int) (initialMemoryAvailable >> 20);
+    if (assignedMb <= 0) {
+      if (initialMemoryAvailable > 0) { // Rounded down to 0MB - may be > 0 && < 1MB
+        this.availableMemoryMb = 1;
+        LOG.warn("initialAvailableMemory: " + initialMemoryAvailable
+            + " is too low. Rounding to 1 MB");
+      } else {
+        throw new RuntimeException("InitialMemoryAssigned is <= 0: " + initialMemoryAvailable);
+      }
+    } else {
+      this.availableMemoryMb = assignedMb;
+    }
 
     // sorter
     sorter = ReflectionUtils.newInstance(this.conf.getClass(
@@ -192,13 +194,6 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
     this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
     this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext);
   }
-  
-  /**
-   * Used to start the actual Output. Typically, this involves allocating
-   * buffers, starting required threads, etc
-   */
-  @Private
-  public abstract void start() throws Exception;
 
   /**
    * Exception indicating that the allocated sort buffer is insufficient to hold
@@ -253,14 +248,15 @@ public abstract class ExternalSorter implements MemoryUpdateCallback {
   public ShuffleHeader getShuffleHeader(int reduce) {
     throw new UnsupportedOperationException("getShuffleHeader isn't supported!");
   }
-  
-  @Override
-  public void memoryAssigned(long assignedSize) {
-    this.availableMemoryMb = (int) (assignedSize >> 20);
-    if (this.availableMemoryMb == 0) {
-      LOG.warn("AssignedMemoryMB: " + this.availableMemoryMb
-          + " is too low. Falling back to initial ask: " + initialMemRequestMb);
-      this.availableMemoryMb = initialMemRequestMb;
-    }
+
+  public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+    int initialMemRequestMb = 
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    Preconditions.checkArgument(initialMemRequestMb != 0, "io.sort.mb should be larger than 0");
+    long reqBytes = initialMemRequestMb << 20;
+    LOG.info("Requested SortBufferSize (io.sort.mb): " + initialMemRequestMb);
+    return reqBytes;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index fedbcb5..54a6e8e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -43,10 +44,14 @@ import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class PipelinedSorter extends ExternalSorter {
   
@@ -58,12 +63,8 @@ public class PipelinedSorter extends ExternalSorter {
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
 
   private final static int APPROX_HEADER_LENGTH = 150;
-  
-  boolean ifileReadAhead;
-  int ifileReadAheadLength;
-  int ifileBufferSize;
-  
-  int partitionBits;
+
+  private final int partitionBits;
   
   private static final int PARTITION = 0;        // partition offset in acct
   private static final int KEYSTART = 1;         // key offset in acct
@@ -76,24 +77,25 @@ public class PipelinedSorter extends ExternalSorter {
   volatile Throwable sortSpillException = null;
 
   int numSpills = 0;
-  int minSpillsForCombine;
-  private HashComparator hasher;
+  private final int minSpillsForCombine;
+  private final HashComparator hasher;
   // SortSpans  
   private SortSpan span;
   private ByteBuffer largeBuffer;
   // Merger
-  private SpanMerger merger; 
-  private ExecutorService sortmaster;
+  private final SpanMerger merger; 
+  private final ExecutorService sortmaster;
 
-  final ArrayList<TezSpillRecord> indexCacheList =
+  private final ArrayList<TezSpillRecord> indexCacheList =
     new ArrayList<TezSpillRecord>();
   private int totalIndexCacheMemory;
   private int indexCacheMemoryLimit;
 
   // TODO Set additional countesr - total bytes written, spills etc.
-  
-  @Override
-  public void start() throws IOException {
+
+  public PipelinedSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+      long initialMemoryAvailable) throws IOException {
+    super(outputContext, conf, numOutputs, initialMemoryAvailable);
     
     partitionBits = bitcount(partitions)+1;
    
@@ -126,7 +128,10 @@ public class PipelinedSorter extends ExternalSorter {
             this.conf.getInt(
                 TezJobConfig.TEZ_RUNTIME_SORT_THREADS, 
                 TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS);
-    sortmaster = Executors.newFixedThreadPool(sortThreads);
+    sortmaster = Executors.newFixedThreadPool(sortThreads,
+        new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Sorter [" + TezUtils.cleanVertexName(outputContext.getDestinationVertexName()) + "] #%d")
+        .build());
 
     // k/v serialization    
     if(comparator instanceof HashComparator) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 5264554..912bf8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -39,6 +40,8 @@ import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -64,7 +67,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   private final static int APPROX_HEADER_LENGTH = 150;
 
   // k/v accounting
-  IntBuffer kvmeta; // metadata overlay on backing store
+  private final IntBuffer kvmeta; // metadata overlay on backing store
   int kvstart;            // marks origin of spill metadata
   int kvend;              // marks end of spill metadata
   int kvindex;            // marks end of fully serialized records
@@ -77,7 +80,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   int bufvoid;            // marks the point where we should stop
                           // reading at the end of the buffer
 
-  byte[] kvbuffer;        // main output buffer
+  private final byte[] kvbuffer;        // main output buffer
   private final byte[] b0 = new byte[0];
 
   protected static final int VALSTART = 0;         // val offset in acct
@@ -88,14 +91,14 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   protected static final int METASIZE = NMETA * 4; // size in bytes
 
   // spill accounting
-  int maxRec;
-  int softLimit;
+  final int maxRec;
+  final int softLimit;
   boolean spillInProgress;
   int bufferRemaining;
   volatile Throwable sortSpillException = null;
 
   int numSpills = 0;
-  int minSpillsForCombine;
+  final int minSpillsForCombine;
   final ReentrantLock spillLock = new ReentrantLock();
   final Condition spillDone = spillLock.newCondition();
   final Condition spillReady = spillLock.newCondition();
@@ -105,11 +108,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
   final ArrayList<TezSpillRecord> indexCacheList =
     new ArrayList<TezSpillRecord>();
+  private final int indexCacheMemoryLimit;
   private int totalIndexCacheMemory;
-  private int indexCacheMemoryLimit;
 
-  @Override
-  public void start() throws IOException {
+  public DefaultSorter(TezOutputContext outputContext, Configuration conf, int numOutputs,
+      long initialMemoryAvailable) throws IOException {
+    super(outputContext, conf, numOutputs, initialMemoryAvailable);
     // sanity checks
     final float spillper = this.conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
@@ -156,7 +160,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     spillInProgress = false;
     minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
     spillThread.setDaemon(true);
-    spillThread.setName("SpillThread");
+    spillThread.setName("SpillThread ["
+        + TezUtils.cleanVertexName(outputContext.getDestinationVertexName() + "]"));
     spillLock.lock();
     try {
       spillThread.start();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index c55012f..4d62346 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -38,10 +38,13 @@ import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.ValuesIterator;
 import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
+import com.google.common.base.Preconditions;
+
 
 /**
  * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
@@ -63,8 +66,9 @@ public class ShuffledMergedInput implements LogicalInput {
   protected Configuration conf;
   protected int numInputs = 0;
   protected Shuffle shuffle;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
-  private volatile long firstEventReceivedTime = -1;
+  private long firstEventReceivedTime = -1;
   @SuppressWarnings("rawtypes")
   protected ValuesIterator vIter;
 
@@ -74,7 +78,7 @@ public class ShuffledMergedInput implements LogicalInput {
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+  public synchronized List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
@@ -86,34 +90,38 @@ public class ShuffledMergedInput implements LogicalInput {
           + inputContext.getSourceVertexName());
       return Collections.emptyList();
     }
+    
+    long initialMemoryRequest = Shuffle.getInitialMemoryRequirement(conf,
+        inputContext.getTotalMemoryAvailableToTask());
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    inputContext.requestInitialMemory(initialMemoryRequest, memoryUpdateCallbackHandler);
 
     this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
     this.inputValueCounter = inputContext.getCounters().findCounter(
         TaskCounter.REDUCE_INPUT_RECORDS);
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
 
-    shuffle = new Shuffle(inputContext, this.conf, numInputs);
     return Collections.emptyList();
   }
-
+  
   @Override
-  public void start() throws IOException {
-    synchronized (this) {
-      if (!isStarted.get()) {
-        // Start the shuffle - copy and merge
-        shuffle.run();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
-        }
-        List<Event> pending = new LinkedList<Event>();
-        pendingEvents.drainTo(pending);
-        if (pending.size() > 0) {
-          LOG.info("NoAutoStart delay in processing first event: "
-              + (System.currentTimeMillis() - firstEventReceivedTime));
-          shuffle.handleEvents(pending);
-        }
-        isStarted.set(true);
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      // Start the shuffle - copy and merge
+      shuffle = new Shuffle(inputContext, conf, numInputs, memoryUpdateCallbackHandler.getMemoryAssigned());
+      shuffle.run();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
+      }
+      List<Event> pending = new LinkedList<Event>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        LOG.info("NoAutoStart delay in processing first event: "
+            + (System.currentTimeMillis() - firstEventReceivedTime));
+        shuffle.handleEvents(pending);
       }
+      isStarted.set(true);
     }
   }
 
@@ -124,7 +132,8 @@ public class ShuffledMergedInput implements LogicalInput {
    *         processing fetching the input. false if the shuffle and merge are
    *         still in progress
    */
-  public boolean isInputReady() {
+  public synchronized boolean isInputReady() {
+    Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
     if (this.numInputs == 0) {
       return true;
     }
@@ -136,7 +145,8 @@ public class ShuffledMergedInput implements LogicalInput {
    * @throws IOException
    * @throws InterruptedException
    */
-  public void waitForInputReady() throws IOException, InterruptedException {
+  public synchronized void waitForInputReady() throws IOException, InterruptedException {
+    Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
     if (this.numInputs == 0) {
       return;
     }
@@ -145,7 +155,7 @@ public class ShuffledMergedInput implements LogicalInput {
   }
 
   @Override
-  public List<Event> close() throws IOException {
+  public synchronized List<Event> close() throws IOException {
     if (this.numInputs != 0 && rawIter != null) {
       rawIter.close();
     }
@@ -165,7 +175,7 @@ public class ShuffledMergedInput implements LogicalInput {
    * @return a KVReader over the sorted input.
    */
   @Override
-  public KeyValuesReader getReader() throws IOException {
+  public synchronized KeyValuesReader getReader() throws IOException {
     if (this.numInputs == 0) {
       return new KeyValuesReader() {
         @Override
@@ -211,26 +221,22 @@ public class ShuffledMergedInput implements LogicalInput {
   }
 
   @Override
-  public void handleEvents(List<Event> inputEvents) {
+  public synchronized void handleEvents(List<Event> inputEvents) {
     if (numInputs == 0) {
       throw new RuntimeException("No input events expected as numInputs is 0");
     }
     if (!isStarted.get()) {
-      synchronized (this) {
-        if (!isStarted.get()) {
-          if (firstEventReceivedTime == -1) {
-            firstEventReceivedTime = System.currentTimeMillis();
-          }
-          pendingEvents.addAll(inputEvents);
-          return;
-        }
+      if (firstEventReceivedTime == -1) {
+        firstEventReceivedTime = System.currentTimeMillis();
       }
+      pendingEvents.addAll(inputEvents);
+      return;
     }
     shuffle.handleEvents(inputEvents);
   }
 
   @Override
-  public void setNumPhysicalInputs(int numInputs) {
+  public synchronized void setNumPhysicalInputs(int numInputs) {
     this.numInputs = numInputs;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index bf0ed11..a07159f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -38,10 +38,10 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
 import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleInputEventHandlerImpl;
@@ -49,7 +49,7 @@ import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
 
 import com.google.common.base.Preconditions;
-public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallback {
+public class ShuffledUnorderedKVInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
   
@@ -58,7 +58,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   private TezInputContext inputContext;
   private ShuffleManager shuffleManager;
   private final BlockingQueue<Event> pendingEvents = new LinkedBlockingQueue<Event>();
-  private volatile long firstEventReceivedTime = -1;
+  private long firstEventReceivedTime = -1;
+  private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   @SuppressWarnings("rawtypes")
   private ShuffledUnorderedKVReader kvReader;
   
@@ -67,14 +68,12 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   
   private SimpleFetchedInputAllocator inputManager;
   private ShuffleEventHandler inputEventHandler;
-  
-  private volatile long initialMemoryAvailable = -1;
-  
+
   public ShuffledUnorderedKVInput() {
   }
 
   @Override
-  public List<Event> initialize(TezInputContext inputContext) throws Exception {
+  public synchronized List<Event> initialize(TezInputContext inputContext) throws Exception {
     Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
     this.inputContext = inputContext;
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
@@ -88,7 +87,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
       return Collections.emptyList();
     } else {
       long initalMemReq = getInitialMemoryReq();
-      this.inputContext.requestInitialMemory(initalMemReq, this);
+      memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+      this.inputContext.requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
     }
 
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
@@ -98,67 +98,60 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   }
 
   @Override
-  public void start() throws IOException {
-    synchronized (this) {
-      if (!isStarted.get()) {
-        ////// Initial configuration
-        Preconditions.checkState(initialMemoryAvailable != -1,
-            "Initial memory available must be configured before starting");
-        CompressionCodec codec;
-        if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-          Class<? extends CompressionCodec> codecClass = ConfigUtils
-              .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-          codec = ReflectionUtils.newInstance(codecClass, conf);
-        } else {
-          codec = null;
-        }
-        
-        boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
-            TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
-        int ifileReadAheadLength = 0;
-        int ifileBufferSize = 0;
-
-        if (ifileReadAhead) {
-          ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
-              TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
-        }
-        ifileBufferSize = conf.getInt("io.file.buffer.size",
-            TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-        
-        this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs);
-        
-        this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
-            inputContext.getTotalMemoryAvailableToTask());
-        inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
-        inputManager.configureAndStart();
-        
-        this.inputEventHandler = new ShuffleInputEventHandlerImpl(
-            inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
-            ifileReadAheadLength);
-
-        this.shuffleManager.setCompressionCodec(codec);
-        this.shuffleManager.setIfileParameters(ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
-        this.shuffleManager.setFetchedInputAllocator(inputManager);
-        this.shuffleManager.setInputEventHandler(inputEventHandler);        
-        ////// End of Initial configuration
-
-        this.shuffleManager.run();
-        this.kvReader = createReader(inputRecordCounter, codec,
-            ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
-        List<Event> pending = new LinkedList<Event>();
-        pendingEvents.drainTo(pending);
-        if (pending.size() > 0) {
-          LOG.info("NoAutoStart delay in processing first event: "
-              + (System.currentTimeMillis() - firstEventReceivedTime));
-          shuffleManager.handleEvents(pending);
-        }
-        isStarted.set(true);
+  public synchronized void start() throws IOException {
+    if (!isStarted.get()) {
+      ////// Initial configuration
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      CompressionCodec codec;
+      if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+        Class<? extends CompressionCodec> codecClass = ConfigUtils
+            .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+        codec = ReflectionUtils.newInstance(codecClass, conf);
+      } else {
+        codec = null;
+      }
+
+      boolean ifileReadAhead = conf.getBoolean(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+          TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+      int ifileReadAheadLength = 0;
+      int ifileBufferSize = 0;
+
+      if (ifileReadAhead) {
+        ifileReadAheadLength = conf.getInt(TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+            TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
       }
+      ifileBufferSize = conf.getInt("io.file.buffer.size",
+          TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+      this.inputManager = new SimpleFetchedInputAllocator(inputContext.getUniqueIdentifier(), conf,
+          inputContext.getTotalMemoryAvailableToTask(),
+          memoryUpdateCallbackHandler.getMemoryAssigned());
+
+      this.shuffleManager = new ShuffleManager(inputContext, conf, numInputs, ifileBufferSize,
+          ifileReadAhead, ifileReadAheadLength, codec, inputManager);
+
+      this.inputEventHandler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager,
+          inputManager, codec, ifileReadAhead, ifileReadAheadLength);
+
+      ////// End of Initial configuration
+
+      this.shuffleManager.run();
+      this.kvReader = createReader(inputRecordCounter, codec,
+          ifileBufferSize, ifileReadAhead, ifileReadAheadLength);
+      List<Event> pending = new LinkedList<Event>();
+      pendingEvents.drainTo(pending);
+      if (pending.size() > 0) {
+        LOG.info("NoAutoStart delay in processing first event: "
+            + (System.currentTimeMillis() - firstEventReceivedTime));
+        inputEventHandler.handleEvents(pending);
+      }
+      isStarted.set(true);
     }
   }
 
   @Override
-  public KeyValueReader getReader() throws Exception {
+  public synchronized KeyValueReader getReader() throws Exception {
+    Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
     if (numInputs == 0) {
       return new KeyValueReader() {
         @Override
@@ -181,29 +174,25 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   }
 
   @Override
-  public void handleEvents(List<Event> inputEvents) throws IOException {
+  public synchronized void handleEvents(List<Event> inputEvents) throws IOException {
     if (numInputs == 0) {
       throw new RuntimeException("No input events expected as numInputs is 0");
     }
     if (!isStarted.get()) {
-      synchronized(this) {
-        if (!isStarted.get()) {
-          if (firstEventReceivedTime == -1) {
-            firstEventReceivedTime = System.currentTimeMillis();
-          }
-          // This queue will keep growing if the Processor decides never to
-          // start the event. The Input, however has no idea, on whether start
-          // will be invoked or not.
-          pendingEvents.addAll(inputEvents);
-          return;
-        }
+      if (firstEventReceivedTime == -1) {
+        firstEventReceivedTime = System.currentTimeMillis();
       }
+      // This queue will keep growing if the Processor decides never to
+      // start the event. The Input, however has no idea, on whether start
+      // will be invoked or not.
+      pendingEvents.addAll(inputEvents);
+      return;
     }
-    shuffleManager.handleEvents(inputEvents);
+    inputEventHandler.handleEvents(inputEvents);
   }
 
   @Override
-  public List<Event> close() throws Exception {
+  public synchronized List<Event> close() throws Exception {
     if (this.shuffleManager != null) {
       this.shuffleManager.shutdown();
     }
@@ -211,15 +200,10 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   }
 
   @Override
-  public void setNumPhysicalInputs(int numInputs) {
+  public synchronized void setNumPhysicalInputs(int numInputs) {
     this.numInputs = numInputs;
   }
 
-  @Override
-  public void memoryAssigned(long assignedSize) {
-    this.initialMemoryAvailable = assignedSize;
-  }
-
   private long getInitialMemoryReq() {
     return SimpleFetchedInputAllocator.getInitialMemoryReq(conf,
         inputContext.getTotalMemoryAvailableToTask());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 3dd41e4..dfd238c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -37,6 +37,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter;
 import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
@@ -46,6 +47,7 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
@@ -61,13 +63,14 @@ public class OnFileSortedOutput implements LogicalOutput {
   protected Configuration conf;
   protected int numOutputs;
   protected TezOutputContext outputContext;
+  protected MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
   private long startTime;
   private long endTime;
   private boolean sendEmptyPartitionDetails;
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
   @Override
-  public List<Event> initialize(TezOutputContext outputContext)
+  public synchronized List<Event> initialize(TezOutputContext outputContext)
       throws IOException {
     this.startTime = System.nanoTime();
     this.outputContext = outputContext;
@@ -76,30 +79,36 @@ public class OnFileSortedOutput implements LogicalOutput {
     // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
     // TezMerger, etc.
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
-    
-    if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
-      sorter = new PipelinedSorter();
-    } else {
-      sorter = new DefaultSorter();
-    }
+    this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+    outputContext.requestInitialMemory(
+        ExternalSorter.getInitialMemoryRequirement(conf,
+            outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
 
     sendEmptyPartitionDetails = this.conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
         TezJobConfig.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED_DEFAULT);
-    sorter.initialize(outputContext, conf, numOutputs);
     return Collections.emptyList();
   }
 
   @Override
-  public void start() throws Exception {
-    if (!isStarted.getAndSet(true)) {
-      sorter.start();
+  public synchronized void start() throws Exception {
+    if (!isStarted.get()) {
+      memoryUpdateCallbackHandler.validateUpdateReceived();
+      if (this.conf.getInt(TezJobConfig.TEZ_RUNTIME_SORT_THREADS,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_THREADS) > 1) {
+        sorter = new PipelinedSorter(outputContext, conf, numOutputs,
+            memoryUpdateCallbackHandler.getMemoryAssigned());
+      } else {
+        sorter = new DefaultSorter(outputContext, conf, numOutputs,
+            memoryUpdateCallbackHandler.getMemoryAssigned());
+      }
+      isStarted.set(true);
     }
   }
 
   @Override
-  public KeyValueWriter getWriter() throws IOException {
+  public synchronized KeyValueWriter getWriter() throws IOException {
+    Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
     return new KeyValueWriter() {
       @Override
       public void write(Object key, Object value) throws IOException {
@@ -109,22 +118,27 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
-  public void handleEvents(List<Event> outputEvents) {
+  public synchronized void handleEvents(List<Event> outputEvents) {
     // Not expecting any events.
   }
 
   @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
+  public synchronized void setNumPhysicalOutputs(int numOutputs) {
     this.numOutputs = numOutputs;
   }
 
   @Override
-  public List<Event> close() throws IOException {
-    sorter.flush();
-    sorter.close();
-    this.endTime = System.nanoTime();
-
-   return generateEventsOnClose();
+  public synchronized List<Event> close() throws IOException {
+    if (sorter != null) {
+      sorter.flush();
+      sorter.close();
+      this.endTime = System.nanoTime();
+      return generateEventsOnClose();
+    } else {
+      LOG.warn("Attempting to close output " + outputContext.getDestinationVertexName()
+          + " before it was started");
+      return Collections.emptyList();
+    }
   }
   
   protected List<Event> generateEventsOnClose() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0bf327c2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 264e628..bde9bfe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -60,7 +61,6 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher;
 import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
 import org.apache.tez.runtime.library.shuffle.common.InputHost;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
@@ -85,49 +85,45 @@ public class ShuffleManager implements FetcherCallback {
   private final Configuration conf;
   private final int numInputs;
 
-  private ShuffleEventHandler inputEventHandler;
-  private FetchedInputAllocator inputManager;
-  
-  private ExecutorService fetcherRawExecutor;
-  private ListeningExecutorService fetcherExecutor;
+  private final FetchedInputAllocator inputManager;
+
+  private final ListeningExecutorService fetcherExecutor;
 
-  private ExecutorService schedulerRawExecutor;
-  private ListeningExecutorService schedulerExecutor;
-  private RunShuffleCallable schedulerCallable = new RunShuffleCallable();
+  private final ExecutorService schedulerRawExecutor;
+  private final ListeningExecutorService schedulerExecutor;
+  private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
   
-  private BlockingQueue<FetchedInput> completedInputs;
-  private AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
-  private Set<InputIdentifier> completedInputSet;
-  private ConcurrentMap<String, InputHost> knownSrcHosts;
-  private BlockingQueue<InputHost> pendingHosts;
-  private Set<InputAttemptIdentifier> obsoletedInputs;
+  private final BlockingQueue<FetchedInput> completedInputs;
+  private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
+  private final Set<InputIdentifier> completedInputSet;
+  private final ConcurrentMap<String, InputHost> knownSrcHosts;
+  private final BlockingQueue<InputHost> pendingHosts;
+  private final Set<InputAttemptIdentifier> obsoletedInputs;
   
-  private AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
   
-  private long startTime;
+  private final long startTime;
   private long lastProgressTime;
 
   // Required to be held when manipulating pendingHosts
-  private ReentrantLock lock = new ReentrantLock();
-  private Condition wakeLoop = lock.newCondition();
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition wakeLoop = lock.newCondition();
   
-  private int numFetchers;
-  private AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  private final int numFetchers;
+  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
   
   // Parameters required by Fetchers
-  private SecretKey shuffleSecret;
-  private int connectionTimeout;
-  private int readTimeout;
-  private CompressionCodec codec;
+  private final SecretKey shuffleSecret;
+  private final int connectionTimeout;
+  private final int readTimeout;
+  private final CompressionCodec codec;
   
-  private int ifileBufferSize;
-  private boolean ifileReadAhead;
-  private int ifileReadAheadLength;
+  private final int ifileBufferSize;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
   
   private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-  
-  private volatile Throwable shuffleError;
-  
+
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private final TezCounter shuffledInputsCounter;
@@ -137,9 +133,13 @@ public class ShuffleManager implements FetcherCallback {
   private final TezCounter bytesShuffledToDiskCounter;
   private final TezCounter bytesShuffledToMemCounter;
   
+  private volatile Throwable shuffleError;
+  
   // TODO More counters - FetchErrors, speed?
   
-  public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+  public ShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs,
+      int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
+      CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
     this.numInputs = numInputs;
@@ -150,28 +150,13 @@ public class ShuffleManager implements FetcherCallback {
     this.decompressedDataSizeCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED);
     this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
     this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
-  }
   
-  public void setIfileParameters(int bufferSize, boolean readAhead, int readAheadLength) {
     this.ifileBufferSize = bufferSize;
-    this.ifileReadAhead = readAhead;
-    this.ifileReadAheadLength = readAheadLength;
-  }
-  
-  public void setCompressionCodec(CompressionCodec codec) {
+    this.ifileReadAhead = ifileReadAheadEnabled;
+    this.ifileReadAheadLength = ifileReadAheadLength;
     this.codec = codec;
-  }
-  
-  public void setInputEventHandler(ShuffleEventHandler eventHandler) {
-    this.inputEventHandler = eventHandler;
-  }
+    this.inputManager = inputAllocator;
   
-  public void setFetchedInputAllocator(FetchedInputAllocator allocator) {
-    this.inputManager = allocator;
-  }
-
-
-  private void configureAndStart() throws IOException {
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
     knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
@@ -185,12 +170,12 @@ public class ShuffleManager implements FetcherCallback {
     
     this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
     
-    this.fetcherRawExecutor = Executors.newFixedThreadPool(
+    ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
         numFetchers,
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
+                "Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d")
             .build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
@@ -199,7 +184,7 @@ public class ShuffleManager implements FetcherCallback {
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+                "ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]")
             .build());
     this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
     
@@ -225,9 +210,7 @@ public class ShuffleManager implements FetcherCallback {
 
   public void run() throws IOException {
     Preconditions.checkState(inputManager != null, "InputManager must be configured");
-    Preconditions.checkState(inputEventHandler != null, "InputEventHandler must be configured");
-    
-    configureAndStart();
+
     ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
     Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
     // Shutdown this executor once this task, and the callback complete.
@@ -444,11 +427,6 @@ public class ShuffleManager implements FetcherCallback {
     obsoletedInputs.add(srcAttemptIdentifier);
     // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
   }
-  
-  
-  public void handleEvents(List<Event> events) throws IOException {
-    inputEventHandler.handleEvents(events);
-  }
 
   /////////////////// End of Methods for InputEventHandler
   /////////////////// Methods from FetcherCallbackHandler