You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/04/16 20:48:14 UTC

git commit: TEZ-919. Fix shutdown handling for Shuffle. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 36e940c6f -> 6b1871970


TEZ-919. Fix shutdown handling for Shuffle. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6b187197
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6b187197
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6b187197

Branch: refs/heads/master
Commit: 6b1871970a4cd21ae5ad96f2e0c60d922066aa53
Parents: 36e940c
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 16 11:44:41 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 16 11:44:41 2014 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/impl/Fetcher.java    | 102 ++++++--
 .../library/common/shuffle/impl/Shuffle.java    | 237 +++++++++++++++----
 .../common/shuffle/impl/ShuffleScheduler.java   |   1 +
 .../exceptions/InputAlreadyClosedException.java |  38 +++
 .../library/input/ShuffledMergedInput.java      |   7 +-
 .../runtime/library/shuffle/common/Fetcher.java | 134 +++++++----
 .../shuffle/common/impl/ShuffleManager.java     | 109 +++++----
 7 files changed, 474 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 32fad76..ac12f23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
@@ -80,6 +79,7 @@ class Fetcher extends Thread {
   private final ShuffleClientMetrics metrics;
   private final Shuffle shuffle;
   private final int id;
+  private final String logIdentifier;
   private static int nextId = 0;
   private int currentPartition = -1;
   
@@ -104,10 +104,16 @@ class Fetcher extends Thread {
 
   private boolean keepAlive = false;
 
+  volatile HttpURLConnection connection;
+  volatile DataInputStream input;
+
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
+      Shuffle shuffle, SecretKey jobTokenSecret,
+      boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec,
+      TezInputContext inputContext) throws IOException {
+    setDaemon(true);
     this.scheduler = scheduler;
     this.merger = merger;
     this.metrics = metrics;
@@ -161,7 +167,8 @@ class Fetcher extends Thread {
     this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
 
-    setName("fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id);
+    this.logIdentifier = "fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
+    setName(logIdentifier);
     setDaemon(true);
 
     synchronized (Fetcher.class) {
@@ -195,6 +202,7 @@ class Fetcher extends Thread {
           // Shuffle
           copyFromHost(host);
         } finally {
+          cleanupCurrentConnection();
           if (host != null) {
             scheduler.freeHost(host);
             metrics.threadFree();            
@@ -211,6 +219,7 @@ class Fetcher extends Thread {
   public void shutDown() throws InterruptedException {
     this.stopped = true;
     interrupt();
+    cleanupCurrentConnection();
     try {
       join(5000);
     } catch (InterruptedException ie) {
@@ -221,6 +230,31 @@ class Fetcher extends Thread {
     }
   }
 
+  private Object cleanupLock = new Object();
+  private void cleanupCurrentConnection() {
+    // Synchronizing on cleanupLock to ensure we don't run into a parallel close
+    // Can't synchronize on the main class itself since that would cause the
+    // shutdown request to block
+    synchronized (cleanupLock) {
+      try {
+        if (input != null) {
+          LOG.info("Closing input on " + logIdentifier);
+          input.close();
+        }
+        if (connection != null) {
+          LOG.info("Closing connection on " + logIdentifier);
+          connection.disconnect();
+        }
+      } catch (IOException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+        } else {
+          LOG.info("Exception while shutting down fetcher " + logIdentifier + ": " + e.getMessage());
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   protected HttpURLConnection openConnection(URL url) throws IOException {
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -263,8 +297,6 @@ class Fetcher extends Thread {
     remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
-    DataInputStream input;
-    HttpURLConnection connection = null;
     boolean connectSucceeded = false;
     
     try {
@@ -285,7 +317,14 @@ class Fetcher extends Thread {
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
       connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      connect(connection, connectionTimeout);
+      connect(connectionTimeout);
+      
+      if (stopped) {
+        LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+        cleanupCurrentConnection();
+        putBackRemainingMapOutputs(host);
+        return;
+      }
       connectSucceeded = true;
       input = new DataInputStream(new BufferedInputStream(connection.getInputStream(), bufferSize));
 
@@ -313,6 +352,12 @@ class Fetcher extends Thread {
       SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
+      if (stopped) {
+        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+        cleanupCurrentConnection();
+        putBackRemainingMapOutputs(host);
+        return;
+      }
       if (keepAlive && connection != null) {
         //Read the response body in case of error. This helps in connection reuse.
         //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
@@ -367,9 +412,9 @@ class Fetcher extends Thread {
           readErrorStream(connection.getErrorStream());
         }
       }
-      
-      IOUtils.cleanup(LOG, input);
-      
+
+      cleanupCurrentConnection();
+
       // Sanity check
       if (failedTasks == null && !remaining.isEmpty()) {
         throw new IOException("server didn't return all expected map outputs: "
@@ -503,6 +548,16 @@ class Fetcher extends Thread {
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
+      if (stopped) {
+        LOG.info("Not reporting fetch failure for exception during data copy: ["
+            + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+        cleanupCurrentConnection();
+        if (mapOutput != null) {
+          mapOutput.abort(); // Release resources
+        }
+        // Don't need to put back - since that's handled by the invoker
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
       ioErrs.increment(1);
       if (srcAttemptId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
@@ -610,8 +665,7 @@ class Fetcher extends Thread {
    * only on the last failure. Instead of connecting with a timeout of 
    * X, we try connecting with a timeout of x < X but multiple times. 
    */
-  private void connect(URLConnection connection, int connectionTimeout)
-  throws IOException {
+  private void connect(int connectionTimeout) throws IOException {
     int unit = 0;
     if (connectionTimeout < 0) {
       throw new IOException("Invalid timeout "
@@ -626,6 +680,14 @@ class Fetcher extends Thread {
         connection.connect();
         break;
       } catch (IOException ioe) {
+        
+        // Don't attempt another connect if already stopped.
+        if (stopped) {
+          LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
+              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+          return;
+        }
+
         // update the total remaining connect-timeout
         connectionTimeout -= unit;
 
@@ -646,31 +708,31 @@ class Fetcher extends Thread {
   }
 
   private void shuffleToMemory(MapHost host, MapOutput mapOutput, 
-                               InputStream input, 
+                               InputStream localInput, 
                                int decompressedLength, 
                                int compressedLength) throws IOException {    
     IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, ifileReadAhead, ifileReadAheadLength);
+      new IFileInputStream(localInput, compressedLength, ifileReadAhead, ifileReadAheadLength);
 
-    input = checksumIn;       
+    localInput = checksumIn;       
   
     // Are map-outputs compressed?
     if (codec != null) {
       decompressor.reset();
-      input = codec.createInputStream(input, decompressor);
+      localInput = codec.createInputStream(localInput, decompressor);
     }
   
     // Copy map-output into an in-memory buffer
     byte[] shuffleData = mapOutput.getMemory();
     
     try {
-      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+      IOUtils.readFully(localInput, shuffleData, 0, shuffleData.length);
       metrics.inputBytes(shuffleData.length);
       LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
                mapOutput.getAttemptIdentifier());
     } catch (IOException ioe) {      
       // Close the streams
-      IOUtils.cleanup(LOG, input);
+      IOUtils.cleanup(LOG, localInput);
 
       // Re-throw
       throw ioe;
@@ -679,7 +741,7 @@ class Fetcher extends Thread {
   }
   
   private void shuffleToDisk(MapHost host, MapOutput mapOutput, 
-                             InputStream input, 
+                             InputStream localInput, 
                              long compressedLength) 
   throws IOException {
     // Copy data to local-disk
@@ -689,7 +751,7 @@ class Fetcher extends Thread {
       final int BYTES_TO_READ = 64 * 1024;
       byte[] buf = new byte[BYTES_TO_READ];
       while (bytesLeft > 0) {
-        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        int n = localInput.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
                                 mapOutput.getAttemptIdentifier());
@@ -706,7 +768,7 @@ class Fetcher extends Thread {
       output.close();
     } catch (IOException ioe) {
       // Close the streams
-      IOUtils.cleanup(LOG, input, output);
+      IOUtils.cleanup(LOG, localInput, output);
 
       // Re-throw
       throw ioe;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index a27fa31..b055f16 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -18,10 +18,13 @@
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.crypto.SecretKey;
 
@@ -42,16 +45,23 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Usage: Create instance, setInitialMemoryAllocated(long), run()
@@ -66,7 +76,6 @@ public class Shuffle implements ExceptionReporter {
   
   private final Configuration conf;
   private final TezInputContext inputContext;
-  private final int numInputs;
   
   private final ShuffleClientMetrics metrics;
 
@@ -78,22 +87,35 @@ public class Shuffle implements ExceptionReporter {
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
+  private final int numFetchers;
   
   private Throwable throwable = null;
   private String throwingThreadName = null;
 
-  private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
+  private final RunShuffleCallable runShuffleCallable;
+  private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
+  private final ListeningExecutorService executor;
+  
+  private final String srcNameTrimmed;
+  
+  private final List<Fetcher> fetchers;
+  
+  private AtomicBoolean isShutDown = new AtomicBoolean(false);
+  private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
+  private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
+  private AtomicBoolean mergerClosed = new AtomicBoolean(false);
 
   public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
       long initialMemoryAvailable) throws IOException {
     this.inputContext = inputContext;
     this.conf = conf;
-    this.numInputs = numInputs;
 
     this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
         inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
     
+    this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
+    
     this.jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
@@ -149,7 +171,7 @@ public class Shuffle implements ExceptionReporter {
     scheduler = new ShuffleScheduler(
           this.inputContext,
           this.conf,
-          this.numInputs,
+          numInputs,
           this,
           shuffledInputsCounter,
           reduceShuffleBytes,
@@ -174,24 +196,62 @@ public class Shuffle implements ExceptionReporter {
           codec,
           ifileReadAhead,
           ifileReadAheadLength);
+    
+    ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
+
+    int configuredNumFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    numFetchers = Math.min(configuredNumFetchers, numInputs);
+    LOG.info("Num fetchers being started: " + numFetchers);
+    fetchers = Lists.newArrayListWithCapacity(numFetchers);
+    
+    executor = MoreExecutors.listeningDecorator(rawExecutor);
+    runShuffleCallable = new RunShuffleCallable();
   }
 
   public void handleEvents(List<Event> events) {
-    eventHandler.handleEvents(events);
+    if (!isShutDown.get()) {
+      eventHandler.handleEvents(events);
+    } else {
+      LOG.info("Ignoring events since already shutdown. EventCount: " + events.size());
+    }
+
   }
   
   /**
    * Indicates whether the Shuffle and Merge processing is complete.
    * @return false if not complete, true if complete or if an error occurred.
+   * @throws InterruptedException 
+   * @throws IOException 
+   * @throws InputAlreadyClosedException 
    */
-  public boolean isInputReady() {
+  // ZZZ Deal with these methods.
+  public boolean isInputReady() throws IOException, InterruptedException {
+    if (isShutDown.get()) {
+      throw new InputAlreadyClosedException();
+    }
+    if (throwable != null) {
+      handleThrowable(throwable);
+    }
     if (runShuffleFuture == null) {
       return false;
     }
-    // TODO This may return true, followed by the reader throwing the actual Exception.
-    // Fix as part of TEZ-919.
+    // Don't need to check merge status, since runShuffleFuture will only
+    // complete once merge is complete.
     return runShuffleFuture.isDone();
-    //return scheduler.isDone() && merger.isMergeComplete();
+  }
+
+  private void handleThrowable(Throwable t) throws IOException, InterruptedException {
+    if (t instanceof IOException) {
+      throw (IOException) t;
+    } else if (t instanceof InterruptedException) {
+      throw (InterruptedException) t;
+    } else {
+      throw new UndeclaredThrowableException(t);
+    }
   }
 
   /**
@@ -200,56 +260,59 @@ public class Shuffle implements ExceptionReporter {
    * @throws IOException
    * @throws InterruptedException
    */
+  // ZZZ Deal with these methods.
   public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
     Preconditions.checkState(runShuffleFuture != null,
         "waitForInput can only be called after run");
-    TezRawKeyValueIterator kvIter;
+    TezRawKeyValueIterator kvIter = null;
     try {
       kvIter = runShuffleFuture.get();
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
-      if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else if (cause instanceof InterruptedException) {
-        throw (InterruptedException) cause;
-      } else {
-        throw new TezUncheckedException(
-            "Unexpected exception type while running Shuffle and Merge", cause);
-      }
+      handleThrowable(cause);
+    }
+    if (isShutDown.get()) {
+      throw new InputAlreadyClosedException();
+    }
+    if (throwable != null) {
+      handleThrowable(throwable);
     }
     return kvIter;
   }
 
   public void run() throws IOException {
     merger.configureAndStart();
-    RunShuffleCallable runShuffle = new RunShuffleCallable();
-    runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
-    new Thread(runShuffleFuture, "ShuffleMergeRunner ["
-        + TezUtils.cleanVertexName(inputContext.getSourceVertexName() + "]")).start();
+    runShuffleFuture = executor.submit(runShuffleCallable);
+    Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
+    executor.shutdown();
   }
-  
+
+  public void shutdown() {
+    if (!isShutDown.getAndSet(true)) {
+      // Interrupt so that the scheduler / merger sees this interrupt.
+      LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed);
+      runShuffleFuture.cancel(true);
+      cleanupIgnoreErrors();
+    }
+  }
+
   private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
     @Override
     public TezRawKeyValueIterator call() throws IOException, InterruptedException {
-      // TODO NEWTEZ Limit # fetchers to number of inputs
-      final int numFetchers = 
-          conf.getInt(
-              TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-              TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
-      Fetcher[] fetchers = new Fetcher[numFetchers];
-      for (int i = 0; i < numFetchers; ++i) {
-        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics,
-            Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
-            codec, inputContext);
-        
-        fetchers[i].start();
+
+      synchronized (this) {
+        for (int i = 0; i < numFetchers; ++i) {
+          Fetcher fetcher = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret,
+              ifileReadAhead, ifileReadAheadLength, codec, inputContext);
+          fetchers.add(fetcher);
+          fetcher.start();
+        }
       }
+
       
       while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
         synchronized (this) {
           if (throwable != null) {
-            inputContext.fatalError(throwable, "error in shuffle from thread :"
-                + throwingThreadName);
             throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                    throwable);
           }
@@ -257,13 +320,10 @@ public class Shuffle implements ExceptionReporter {
       }
       
       // Stop the map-output fetcher threads
-      for (Fetcher fetcher : fetchers) {
-        fetcher.shutDown();
-      }
-      fetchers = null;
+      cleanupFetchers(false);
       
       // stop the scheduler
-      scheduler.close();
+      cleanupShuffleScheduler(false);
 
 
       // Finish the on-going merges...
@@ -271,15 +331,12 @@ public class Shuffle implements ExceptionReporter {
       try {
         kvIter = merger.close();
       } catch (Throwable e) {
-        inputContext.fatalError(e, "Error while doing final merge ");
         throw new ShuffleError("Error while doing final merge " , e);
       }
       
       // Sanity check
       synchronized (Shuffle.this) {
         if (throwable != null) {
-          inputContext.fatalError(throwable, "error in shuffle from thread :"
-             + throwingThreadName);
           throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                  throwable);
         }
@@ -291,6 +348,73 @@ public class Shuffle implements ExceptionReporter {
     }
   }
   
+  private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
+    // Stop the fetcher threads
+    InterruptedException ie = null;
+    if (!fetchersClosed.getAndSet(true)) {
+      for (Fetcher fetcher : fetchers) {
+        try {
+          fetcher.shutDown();
+        } catch (InterruptedException e) {
+          if (ignoreErrors) {
+            LOG.info("Interrupted while shutting down fetchers. Ignoring.");
+          } else {
+            if (ie != null) {
+              ie = e;
+            } else {
+              LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
+                  + e);
+            }
+          }
+        }
+      }
+      fetchers.clear();
+      // throw only the first exception while attempting to shutdown.
+      if (ie != null) {
+        throw ie;
+      }
+    }
+  }
+
+  private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
+
+    if (!schedulerClosed.getAndSet(true)) {
+      try {
+        scheduler.close();
+      } catch (InterruptedException e) {
+        if (ignoreErrors) {
+          LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void cleanupMerger(boolean ignoreErrors) throws Throwable {
+    if (!mergerClosed.getAndSet(true)) {
+      try {
+        merger.close();
+      } catch (Throwable e) {
+        if (ignoreErrors) {
+          LOG.info("Exception while trying to shutdown merger, Ignoring", e);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void cleanupIgnoreErrors() {
+    try {
+      cleanupFetchers(true);
+      cleanupShuffleScheduler(true);
+      cleanupMerger(true);
+    } catch (Throwable t) {
+      // Ignore
+    }
+  }
+
   @Private
   public synchronized void reportException(Throwable t) {
     if (throwable == null) {
@@ -316,4 +440,23 @@ public class Shuffle implements ExceptionReporter {
   public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
     return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
   }
+  
+  private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
+    @Override
+    public void onSuccess(TezRawKeyValueIterator result) {
+      LOG.info("Shuffle Runner thread complete");
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      // ZZZ Handle failures during shutdown.
+      if (isShutDown.get()) {
+        LOG.info("Already shutdown. Ignoring error: ",  t);
+      } else {
+        LOG.error("ShuffleRunner failed with error", t);
+        inputContext.fatalError(t, "Shuffle Runner Failed");
+        cleanupIgnoreErrors();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 45a50ad..41ce489 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -570,6 +570,7 @@ class ShuffleScheduler {
   }
   
   public void close() throws InterruptedException {
+    /// ZZZ need to interrupt setlf ?
     referee.interrupt();
     referee.join();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
new file mode 100644
index 0000000..ee41066
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.exceptions;
+
+
+public class InputAlreadyClosedException extends RuntimeException {
+
+  private static final long serialVersionUID = 5094990552896724803L;
+
+  public InputAlreadyClosedException() {
+    super("Input already closed");
+  }
+
+  public InputAlreadyClosedException(Throwable cause) {
+    super("Input already closed", cause);
+  }
+
+  public InputAlreadyClosedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 0c01f92..7045b5c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -127,8 +127,10 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
    * @return true if the input is ready for consumption, or if an error occurred
    *         processing fetching the input. false if the shuffle and merge are
    *         still in progress
+   * @throws InterruptedException 
+   * @throws IOException 
    */
-  public synchronized boolean isInputReady() {
+  public synchronized boolean isInputReady() throws IOException, InterruptedException {
     Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
     if (getNumPhysicalInputs() == 0) {
       return true;
@@ -164,6 +166,9 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
     if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
       rawIter.close();
     }
+    if (shuffle != null) {
+      shuffle.shutdown();
+    }
     return Collections.emptyList();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index b2ac9dd..9ecf590 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,15 +30,13 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -76,6 +72,10 @@ public class Fetcher implements Callable<FetchResult> {
   private final FetcherCallback fetcherCallback;
   private final FetchedInputAllocator inputManager;
   private final ApplicationId appId;
+  
+  private final String logIdentifier;
+  
+  private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
   private static boolean sslShuffle = false;
   private static SSLFactory sslFactory;
@@ -94,12 +94,14 @@ public class Fetcher implements Callable<FetchResult> {
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
   private URL url;
+  private volatile HttpURLConnection connection;
+  private volatile DataInputStream input;
   private String encHash;
   private String msgToEncode;
 
   private Fetcher(FetcherCallback fetcherCallback,
       FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
-      Configuration conf) {
+      String srcNameTrimmed) {
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.shuffleSecret = shuffleSecret;
@@ -107,6 +109,7 @@ public class Fetcher implements Callable<FetchResult> {
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
+    this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
     
     // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
     // Re-enable when ssl shuffle support is needed.
@@ -141,40 +144,62 @@ public class Fetcher implements Callable<FetchResult> {
 
     remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
 
-    HttpURLConnection connection;
     try {
-      connection = connectToShuffleHandler(host, port, partition, srcAttempts);
+      connectToShuffleHandler(host, port, partition, srcAttempts);
     } catch (IOException e) {
       // ioErrs.increment(1);
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
-      for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
-          .hasNext();) {
-        fetcherCallback.fetchFailed(host, leftIter.next(), true);
+      if (isShutDown.get()) {
+        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+      } else {
+        for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
+            .hasNext();) {
+          fetcherCallback.fetchFailed(host, leftIter.next(), true);
+        }
       }
       return new FetchResult(host, port, partition, remaining);
     }
-
-    DataInputStream input;
+    if (isShutDown.get()) {
+      // shutdown would have no effect if in the process of establishing the connection.
+      shutdownInternal();
+      LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+      return new FetchResult(host, port, partition, remaining);
+    }
 
     try {
       input = new DataInputStream(connection.getInputStream());
-      validateConnectionResponse(connection, url, msgToEncode, encHash);
+      validateConnectionResponse(msgToEncode, encHash);
     } catch (IOException e) {
       // ioErrs.increment(1);
       // If we got a read error at this stage, it implies there was a problem
       // with the first map, typically lost map. So, penalize only that map
       // and add the rest
-      InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
-      LOG.warn("Fetch Failure from host while connecting: " + host
-          + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e);
-      fetcherCallback.fetchFailed(host, firstAttempt, false);
-      return new FetchResult(host, port, partition, remaining);
+      if (isShutDown.get()) {
+        LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+      } else {
+        InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
+        LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
+            + " Informing ShuffleManager: ", e);
+        fetcherCallback.fetchFailed(host, firstAttempt, false);
+        return new FetchResult(host, port, partition, remaining);
+      }
     }
 
     // By this point, the connection is setup and the response has been
     // validated.
 
+    // Handle any shutdown which may have been invoked.
+    if (isShutDown.get()) {
+      // shutdown would have no effect if in the process of establishing the connection.
+      shutdownInternal();
+      LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
+      return new FetchResult(host, port, partition, remaining);
+    }
+    // After this point, closing the stream and connection, should cause a
+    // SocketException,
+    // which will be ignored since shutdown has been invoked.
+
     // Loop through available map-outputs and fetch them
     // On any error, faildTasks is not null and we exit
     // after putting back the remaining maps to the
@@ -191,7 +216,7 @@ public class Fetcher implements Callable<FetchResult> {
       }
     }
 
-    IOUtils.cleanup(LOG, input);
+    shutdown();
 
     // Sanity check
     if (failedInputs == null && !remaining.isEmpty()) {
@@ -203,6 +228,36 @@ public class Fetcher implements Callable<FetchResult> {
 
   }
 
+  public void shutdown() {
+    if (!isShutDown.getAndSet(true)) {
+      shutdownInternal();
+    }
+  }
+
+  private void shutdownInternal() {
+    // Synchronizing on isShutDown to ensure we don't run into a parallel close
+    // Can't synchronize on the main class itself since that would cause the
+    // shutdown request to block
+    synchronized (isShutDown) {
+      try {
+        if (input != null) {
+          LOG.info("Closing input on " + logIdentifier);
+          input.close();
+        }
+        if (connection != null) {
+          LOG.info("Closing connection on " + logIdentifier);
+          connection.disconnect();
+        }
+      } catch (IOException e) {
+        LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
+            + e.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(e);
+        }
+      }
+    }
+  }
+
   private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
     FetchedInput fetchedInput = null;
     InputAttemptIdentifier srcAttemptId = null;
@@ -285,6 +340,8 @@ public class Fetcher implements Callable<FetchResult> {
       // metrics.successFetch();
       return null;
     } catch (IOException ioe) {
+      // ZZZ Add some shutdown code here
+      // ZZZ Make sure any assigned memory inputs are aborted
       // ioErrs.increment(1);
       if (srcAttemptId == null || fetchedInput == null) {
         LOG.info("fetcher" + " failed to read map header" + srcAttemptId
@@ -360,11 +417,11 @@ public class Fetcher implements Callable<FetchResult> {
     }
   }
 
-  private HttpURLConnection connectToShuffleHandler(String host, int port,
+  private void connectToShuffleHandler(String host, int port,
       int partition, List<InputAttemptIdentifier> inputs) throws IOException {
     try {
       this.url = constructInputURL(host, port, partition, inputs);
-      HttpURLConnection connection = openConnection(url);
+      this.connection = openConnection(url);
 
       // generate hash of the url
       this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -382,8 +439,7 @@ public class Fetcher implements Callable<FetchResult> {
       connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
 
-      connect(connection, connectionTimeout);
-      return connection;
+      connect(connectionTimeout);
     } catch (IOException e) {
       LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size()
           + " inputs", e);
@@ -391,8 +447,7 @@ public class Fetcher implements Callable<FetchResult> {
     }
   }
 
-  private void validateConnectionResponse(HttpURLConnection connection,
-      URL url, String msgToEncode, String encHash) throws IOException {
+  private void validateConnectionResponse(String msgToEncode, String encHash) throws IOException {
     int rc = connection.getResponseCode();
     if (rc != HttpURLConnection.HTTP_OK) {
       throw new IOException("Got invalid response code " + rc + " from " + url
@@ -422,17 +477,8 @@ public class Fetcher implements Callable<FetchResult> {
   }
 
   protected HttpURLConnection openConnection(URL url) throws IOException {
-    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-    if (sslShuffle) {
-      HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
-      try {
-        httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
-      } catch (GeneralSecurityException ex) {
-        throw new IOException(ex);
-      }
-      httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
-    }
-    return conn;
+    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+    return connection;
   }
 
   /**
@@ -440,8 +486,7 @@ public class Fetcher implements Callable<FetchResult> {
    * only on the last failure. Instead of connecting with a timeout of X, we try
    * connecting with a timeout of x < X but multiple times.
    */
-  private void connect(URLConnection connection, int connectionTimeout)
-      throws IOException {
+  private void connect(int connectionTimeout) throws IOException {
     int unit = 0;
     if (connectionTimeout < 0) {
       throw new IOException("Invalid timeout " + "[timeout = "
@@ -456,6 +501,13 @@ public class Fetcher implements Callable<FetchResult> {
         connection.connect();
         break;
       } catch (IOException ioe) {
+
+        // Check if already shutdown and abort subsequent connect attempts
+        if (isShutDown.get()) {
+          LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
+              + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+          return;
+        }
         // update the total remaining connect-timeout
         connectionTimeout -= unit;
 
@@ -503,9 +555,9 @@ public class Fetcher implements Callable<FetchResult> {
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         FetchedInputAllocator inputManager, ApplicationId appId,
-        SecretKey shuffleSecret, Configuration conf) {
+        SecretKey shuffleSecret, String srcNameTrimmed) {
       this.fetcher = new Fetcher(fetcherCallback, inputManager, appId,
-          shuffleSecret, conf);
+          shuffleSecret, srcNameTrimmed);
     }
 
     public FetcherBuilder setCompressionParameters(CompressionCodec codec) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 050c0c0..657feed 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -82,14 +82,12 @@ public class ShuffleManager implements FetcherCallback {
   private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
   
   private final TezInputContext inputContext;
-  private final Configuration conf;
   private final int numInputs;
 
   private final FetchedInputAllocator inputManager;
 
   private final ListeningExecutorService fetcherExecutor;
 
-  private final ExecutorService schedulerRawExecutor;
   private final ListeningExecutorService schedulerExecutor;
   private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
   
@@ -99,6 +97,7 @@ public class ShuffleManager implements FetcherCallback {
   private final ConcurrentMap<String, InputHost> knownSrcHosts;
   private final BlockingQueue<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
+  private Set<Fetcher> runningFetchers;
   
   private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
   
@@ -110,7 +109,6 @@ public class ShuffleManager implements FetcherCallback {
   private final Condition wakeLoop = lock.newCondition();
   
   private final int numFetchers;
-  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
   
   // Parameters required by Fetchers
   private final SecretKey shuffleSecret;
@@ -122,8 +120,8 @@ public class ShuffleManager implements FetcherCallback {
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
   
-  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-
+  private final String srcNameTrimmed; 
+  
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   private final TezCounter shuffledInputsCounter;
@@ -141,7 +139,6 @@ public class ShuffleManager implements FetcherCallback {
       int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
       CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
     this.inputContext = inputContext;
-    this.conf = conf;
     this.numInputs = numInputs;
     
     this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
@@ -156,13 +153,16 @@ public class ShuffleManager implements FetcherCallback {
     this.ifileReadAheadLength = ifileReadAheadLength;
     this.codec = codec;
     this.inputManager = inputAllocator;
+    
+    this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
   
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
     knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
     pendingHosts = new LinkedBlockingQueue<InputHost>();
     obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-    
+    runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
+
     int maxConfiguredFetchers = 
         conf.getInt(
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
@@ -172,20 +172,13 @@ public class ShuffleManager implements FetcherCallback {
     
     ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
         numFetchers,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(
-                "Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d")
-            .build());
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
     this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
     
-    this.schedulerRawExecutor = Executors.newFixedThreadPool(
-        1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(
-                "ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]")
-            .build());
+    
+    ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
     this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
     
     this.startTime = System.currentTimeMillis();
@@ -224,7 +217,7 @@ public class ShuffleManager implements FetcherCallback {
       while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
         lock.lock();
         try {
-          if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+          if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
             if (numCompletedInputs.get() < numInputs) {
               wakeLoop.await();
             }
@@ -242,12 +235,12 @@ public class ShuffleManager implements FetcherCallback {
         if (LOG.isDebugEnabled()) {
           LOG.debug("NumCompletedInputs: " + numCompletedInputs);
         }
-        if (numCompletedInputs.get() < numInputs) {
+        if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
           lock.lock();
           try {
-            int maxFetchersToRun = numFetchers - numRunningFetchers.get();
+            int maxFetchersToRun = numFetchers - runningFetchers.size();
             int count = 0;
-            while (pendingHosts.peek() != null) {
+            while (pendingHosts.peek() != null && !isShutdown.get()) {
               InputHost inputHost = null;
               try {
                 inputHost = pendingHosts.take();
@@ -262,16 +255,16 @@ public class ShuffleManager implements FetcherCallback {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Processing pending host: " + inputHost.toDetailedString());
               }
-              if (inputHost.getNumPendingInputs() > 0) {
+              if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
                 LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
                 Fetcher fetcher = constructFetcherForHost(inputHost);
-                numRunningFetchers.incrementAndGet();
+                runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
                   LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
                 }
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);
-                Futures.addCallback(future, fetchFutureCallback);
+                Futures.addCallback(future, new FetchFutureCallback(fetcher));
                 if (++count >= maxFetchersToRun) {
                   break;
                 }
@@ -297,9 +290,8 @@ public class ShuffleManager implements FetcherCallback {
   }
   
   private Fetcher constructFetcherForHost(InputHost inputHost) {
-    FetcherBuilder fetcherBuilder = new FetcherBuilder(
-        ShuffleManager.this, inputManager,
-        inputContext.getApplicationId(), shuffleSecret, conf);
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, inputManager,
+        inputContext.getApplicationId(), shuffleSecret, srcNameTrimmed);
     fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
@@ -517,15 +509,29 @@ public class ShuffleManager implements FetcherCallback {
   /////////////////// End of Methods from FetcherCallbackHandler
 
   public void shutdown() throws InterruptedException {
-    isShutdown.set(true);
-    if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
-      this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
-    }
-    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
-      this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
+    if (!isShutdown.getAndSet(true)) {
+      // Shut down any pending fetchers
+      LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
+          + runningFetchers.size());
+      lock.lock();
+      try {
+        wakeLoop.signal(); // signal the fetch-scheduler
+        for (Fetcher fetcher : runningFetchers) {
+          fetcher.shutdown(); // This could be parallelized.
+        }
+      } finally {
+        lock.unlock();
+      }
+
+      if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+        this.schedulerExecutor.shutdownNow();
+      }
+      if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+        this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
+      }
     }
   }
-  
+
   private void registerCompletedInput(FetchedInput fetchedInput) {
     lock.lock();
     try {
@@ -653,10 +659,16 @@ public class ShuffleManager implements FetcherCallback {
   
   private class FetchFutureCallback implements FutureCallback<FetchResult> {
 
+    private final Fetcher fetcher;
+    
+    public FetchFutureCallback(Fetcher fetcher) {
+      this.fetcher = fetcher;
+    }
+    
     private void doBookKeepingForFetcherComplete() {
-      numRunningFetchers.decrementAndGet();
       lock.lock();
       try {
+        runningFetchers.remove(fetcher);
         wakeLoop.signal();
       } finally {
         lock.unlock();
@@ -665,20 +677,27 @@ public class ShuffleManager implements FetcherCallback {
     
     @Override
     public void onSuccess(FetchResult result) {
-      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
-      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-        InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
-        assert inputHost != null;
-        for (InputAttemptIdentifier input : pendingInputs) {
-          inputHost.addKnownInput(input);
+      fetcher.shutdown();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring event from fetcher");
+      } else {
+        Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+        if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+          InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
+          assert inputHost != null;
+          for (InputAttemptIdentifier input : pendingInputs) {
+            inputHost.addKnownInput(input);
+          }
+          pendingHosts.add(inputHost);
         }
-        pendingHosts.add(inputHost);
+        doBookKeepingForFetcherComplete();
       }
-      doBookKeepingForFetcherComplete();
     }
 
     @Override
     public void onFailure(Throwable t) {
+      // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
+      fetcher.shutdown();
       if (isShutdown.get()) {
         LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
       } else {