You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:10 UTC

[25/25] git commit: TEZ-1157. Optimize broadcast shuffle to download data only once per host. (gopalv)

TEZ-1157. Optimize broadcast shuffle to download data only once per host. (gopalv)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/625450cf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/625450cf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/625450cf

Branch: refs/heads/TEZ-8
Commit: 625450cf11454fa9697a902ba70367de00cdc170
Parents: e328055
Author: Gopal V <go...@apache.org>
Authored: Wed Sep 17 20:53:11 2014 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed Sep 17 20:53:11 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +-
 .../library/api/TezRuntimeConfiguration.java    |  12 +
 .../library/common/InputAttemptIdentifier.java  |  26 +-
 .../shuffle/common/DiskFetchedInput.java        |   7 +
 .../library/shuffle/common/FetchedInput.java    |   3 +-
 .../shuffle/common/FetchedInputAllocator.java   |   3 +
 .../runtime/library/shuffle/common/Fetcher.java | 318 +++++++++++++++++--
 .../impl/ShuffleInputEventHandlerImpl.java      |  18 +-
 .../shuffle/common/impl/ShuffleManager.java     |  42 ++-
 .../impl/SimpleFetchedInputAllocator.java       |  17 +
 10 files changed, 412 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd5569c..5e2c2cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,8 +19,8 @@ ALL CHANGES:
   TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
   TEZ-1524. Resolve user group information only if ACLs are enabled.
   TEZ-1581. GroupByOrderByMRRTest no longer functional.
-  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
-  of DAG submission
+  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission.
+  TEZ-1157. Optimize broadcast shuffle to download data only once per host. 
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 077ce8e..cb61109 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -280,6 +280,17 @@ public class TezRuntimeConfiguration {
    */
   public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = false;
 
+  /**
+   * Share data fetched between tasks running on the same host if applicable
+   */
+  public static final String TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH = TEZ_RUNTIME_PREFIX
+      + "optimize.shared.fetch";
+
+  /**
+   * shared mode bypassing the http fetch is not enabled by default till we have unit tests in.
+   */
+  public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false;
+
   // TODO TEZ-1233 - allow this property to be set per vertex
   // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
 
@@ -333,6 +344,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 7c8a23b..9987d26 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -29,28 +29,38 @@ public class InputAttemptIdentifier {
 
   private final InputIdentifier inputIdentifier;
   private final int attemptNumber;
-  private String pathComponent;
-  
+  private final String pathComponent;
+  private final boolean shared;
+
   public static final String PATH_PREFIX = "attempt";
-  
+
   public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
     this(new InputIdentifier(inputIndex), attemptNumber, null);
   }
-  
+
   public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+    this(inputIdentifier, attemptNumber, pathComponent, false);
+  }
+
+  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
     this.inputIdentifier = inputIdentifier;
     this.attemptNumber = attemptNumber;
     this.pathComponent = pathComponent;
+    this.shared = shared;
     if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
       throw new TezUncheckedException(
           "Path component must start with: " + PATH_PREFIX + " " + this);
     }
   }
-  
+
   public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
     this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
   }
 
+  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) {
+    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared);
+  }
+
   public InputIdentifier getInputIdentifier() {
     return this.inputIdentifier;
   }
@@ -63,7 +73,11 @@ public class InputAttemptIdentifier {
     return pathComponent;
   }
 
-  // PathComponent does not need to be part of the hashCode and equals computation.
+  public boolean isShared() {
+    return this.shared;
+  }
+
+  // PathComponent & shared does not need to be part of the hashCode and equals computation.
   @Override
   public int hashCode() {
     final int prime = 31;

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 1d26c6e..b0b911b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -65,6 +65,13 @@ public class DiskFetchedInput extends FetchedInput {
   public InputStream getInputStream() throws IOException {
     return localFS.open(outputPath);
   }
+
+  public final Path getInputPath() {
+    if (state == State.COMMITTED) {
+      return this.outputPath;
+    }
+    return this.tmpOutputPath;
+  }
   
   @Override
   public void commit() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
index fde19b7..0a83dc9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -110,7 +110,8 @@ public abstract class FetchedInput {
 
   /**
    * Return an input stream to be used to read the previously fetched data.
-   * Users are expected to close the InputStream when they're done
+   * All calls to getInputStream() produce new reset streams for reading.
+   * Users are expected to close the InputStream when they're done.
    */
   public abstract InputStream getInputStream() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
index 1707ab7..288df6d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -21,11 +21,14 @@ package org.apache.tez.runtime.library.shuffle.common;
 import java.io.IOException;
 
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 
 public interface FetchedInputAllocator {
 
   public FetchedInput allocate(long actualSize, long compresedSize,
       InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
   
+  public FetchedInput allocateType(Type type, long actualSize, long compresedSize,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
   
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 9cb8617..e25124b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -19,10 +19,17 @@
 package org.apache.tez.runtime.library.shuffle.common;
 
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
 import java.net.URL;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -34,12 +41,19 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.crypto.SecretKey;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -54,6 +68,7 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 
 /**
  * Responsible for fetching inputs served by the ShuffleHandler for a single
@@ -101,10 +116,22 @@ public class Fetcher implements Callable<FetchResult> {
   private HttpConnectionParams httpConnectionParams;
 
   private final boolean localDiskFetchEnabled;
+  private final boolean sharedFetchEnabled;
+
+  private final LocalDirAllocator localDirAllocator;
+  private final Path lockPath;
+  private final RawLocalFileSystem localFs;
+
+  private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
-      FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
-      String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
+      FetchedInputAllocator inputManager, ApplicationId appId,
+      SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
+      RawLocalFileSystem localFs,
+      LocalDirAllocator localDirAllocator,
+      Path lockPath,
+      boolean localDiskFetchEnabled,
+      boolean sharedFetchEnabled) {
     this.fetcherCallback = fetcherCallback;
     this.inputManager = inputManager;
     this.shuffleSecret = shuffleSecret;
@@ -114,19 +141,42 @@ public class Fetcher implements Callable<FetchResult> {
     this.conf = conf;
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
+    this.sharedFetchEnabled = sharedFetchEnabled;
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
-    this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
+    this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
+
+    this.localFs = localFs;
+    this.localDirAllocator = localDirAllocator;
+    this.lockPath = lockPath;
+
+    try {
+      if (this.sharedFetchEnabled) {
+        this.localFs.mkdirs(this.lockPath);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error initializing local dirs for shared transfer " + e);
+    }
   }
 
   @Override
   public FetchResult call() throws Exception {
+    boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
+
     if (srcAttempts.size() == 0) {
       return new FetchResult(host, port, partition, srcAttempts);
     }
 
     for (InputAttemptIdentifier in : srcAttempts) {
       pathToAttemptMap.put(in.getPathComponent(), in);
+      // do only if all of them are shared fetches
+      multiplex &= in.isShared();
+    }
+
+    if (multiplex) {
+      Preconditions.checkArgument(partition == 0,
+          "Shared fetches cannot be done for partitioned input"
+              + "- partition is non-zero (%d)", partition);
     }
 
     remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
@@ -136,7 +186,9 @@ public class Fetcher implements Callable<FetchResult> {
     if (localDiskFetchEnabled &&
         host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
       hostFetchResult = setupLocalDiskFetch();
-    } else {
+    } else if (multiplex) {
+      hostFetchResult = doSharedFetch();
+    } else{
       hostFetchResult = doHttpFetch();
     }
 
@@ -151,15 +203,197 @@ public class Fetcher implements Callable<FetchResult> {
 
     // Sanity check
     if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
-      throw new IOException("server didn't return all expected map outputs: "
-          + remaining.size() + " left.");
+      if (!multiplex) {
+        throw new IOException("server didn't return all expected map outputs: "
+            + remaining.size() + " left.");
+      } else {
+        LOG.info("Shared fetch failed to return " + remaining.size() + " inputs on this try");
+      }
     }
 
     return hostFetchResult.fetchResult;
   }
 
+  private final class CachingCallBack {
+    // this is a closure object wrapping this in an inner class
+    public void cache(String host,
+        InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput,
+        long compressedLength, long decompressedLength) {
+      try {
+        // this breaks badly on partitioned input - please use responsibly
+        Preconditions.checkArgument(partition == 0, "Partition == 0");
+        final String tmpSuffix = "." + System.currentTimeMillis() + ".tmp";
+        final String finalOutput = getMapOutputFile(srcAttemptId.getPathComponent());
+        final Path outputPath = localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, conf);
+        final TezSpillRecord spillRec = new TezSpillRecord(1);
+        final TezIndexRecord indexRec;
+        Path tmpIndex = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING+tmpSuffix);
+
+        if (localFs.exists(tmpIndex)) {
+          LOG.warn("Found duplicate instance of input index file " + tmpIndex);
+          return;
+        }
+
+        Path tmpPath = null;
+
+        switch (fetchedInput.getType()) {
+        case DISK: {
+          DiskFetchedInput input = (DiskFetchedInput) fetchedInput;
+          indexRec = new TezIndexRecord(0, decompressedLength, compressedLength);
+          localFs.mkdirs(outputPath.getParent());
+          // avoid pit-falls of speculation
+          tmpPath = outputPath.suffix(tmpSuffix);
+          // JDK7 - TODO: use Files implementation to speed up this process
+          localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
+          // rename is atomic
+          boolean renamed = localFs.rename(tmpPath, outputPath);
+          if(!renamed) {
+            LOG.warn("Could not rename to cached file name " + outputPath);
+            localFs.delete(tmpPath, false);
+            return;
+          }
+        }
+        break;
+        default:
+          LOG.warn("Incorrect use of CachingCallback for " + srcAttemptId);
+          return;
+        }
+
+        spillRec.putIndex(indexRec, 0);
+        spillRec.writeToFile(tmpIndex, conf);
+        // everything went well so far - rename it
+        boolean renamed = localFs.rename(tmpIndex, outputPath
+            .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
+        if (!renamed) {
+          localFs.delete(tmpIndex, false);
+          if (outputPath != null) {
+            // invariant: outputPath was renamed from tmpPath
+            localFs.delete(outputPath, false);
+          }
+          LOG.warn("Could not rename the index file to "
+              + outputPath
+                  .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
+          return;
+        }
+      } catch (IOException ioe) {
+        // do mostly nothing
+        LOG.warn("Cache threw an error " + ioe);
+      }
+    }
+  }
+
+  private int findInputs() throws IOException {
+    int k = 0;
+    for (InputAttemptIdentifier src : srcAttempts) {
+      try {
+        if (getShuffleInputFileName(src.getPathComponent(),
+            Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING) != null) {
+          k++;
+        }
+      } catch (DiskErrorException de) {
+        // missing file, ignore
+      }
+    }
+    return k;
+  }
+
+  private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
+    File lockFile = localFs.pathToFile(new Path(lockPath, host + ".lock"));
+
+    final boolean created = lockFile.createNewFile();
+
+    if (created == false && !lockFile.exists()) {
+      // bail-out cleanly
+      return null;
+    }
+
+    // invariant - file created (winner writes to this file)
+    // caveat: closing lockChannel does close the file (do not double close)
+    // JDK7 - TODO: use AsynchronousFileChannel instead of RandomAccessFile
+    FileChannel lockChannel = new RandomAccessFile(lockFile, "rws")
+        .getChannel();
+    FileLock xlock = null;
+
+    xlock = lockChannel.tryLock(0, Long.MAX_VALUE, false);
+    if (xlock != null) {
+      return xlock;
+    }
+    lockChannel.close();
+    return null;
+  }
+
+  private void releaseLock(FileLock lock) throws IOException {
+    if (lock != null && lock.isValid()) {
+      FileChannel lockChannel = lock.channel();
+      lock.release();
+      lockChannel.close();
+    }
+  }
+
+  protected HostFetchResult doSharedFetch() throws IOException {
+    int inputs = findInputs();
+
+    if (inputs == srcAttempts.size()) {
+      if (isDebugEnabled) {
+        LOG.debug("Using the copies found locally");
+      }
+      return doLocalDiskFetch(true);
+    }
+
+    if (inputs > 0) {
+      if (isDebugEnabled) {
+        LOG.debug("Found " + input
+            + " local fetches right now, using them first");
+      }
+      return doLocalDiskFetch(false);
+    }
+
+    FileLock lock = null;
+    try {
+      lock = getLock();
+      if (lock == null) {
+        // re-queue until we get a lock
+        LOG.info("Requeuing " + host + ":" + port
+            + " downloads because we didn't get a lock");
+        return new HostFetchResult(new FetchResult(host, port, partition,
+            remaining), null, false);
+      } else {
+        if (findInputs() == srcAttempts.size()) {
+          // double checked after lock
+          releaseLock(lock);
+          lock = null;
+          return doLocalDiskFetch(true);
+        }
+        // cache data if possible
+        return doHttpFetch(new CachingCallBack());
+      }
+    } catch (OverlappingFileLockException jvmCrossLock) {
+      // fall back to HTTP fetch below
+      LOG.warn("Double locking detected for " + host);
+    } catch (InterruptedException sleepInterrupted) {
+      // fall back to HTTP fetch below
+      LOG.warn("Lock was interrupted for " + host);
+    } finally {
+      releaseLock(lock);
+    }
+
+    if (isShutDown.get()) {
+      // if any exception was due to shut-down don't bother firing any more
+      // requests
+      return new HostFetchResult(new FetchResult(host, port, partition,
+          remaining), null, false);
+    }
+    // no more caching
+    return doHttpFetch();
+  }
+
   @VisibleForTesting
   protected HostFetchResult doHttpFetch() {
+    return doHttpFetch(null);
+  }
+
+  @VisibleForTesting
+  protected HostFetchResult doHttpFetch(CachingCallBack callback) {
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
           port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
@@ -227,7 +461,7 @@ public class Fetcher implements Callable<FetchResult> {
     // yet_to_be_fetched list and marking the failed tasks.
     InputAttemptIdentifier[] failedInputs = null;
     while (!remaining.isEmpty() && failedInputs == null) {
-      failedInputs = fetchInputs(input);
+      failedInputs = fetchInputs(input, callback);
     }
 
     return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
@@ -236,6 +470,11 @@ public class Fetcher implements Callable<FetchResult> {
 
   @VisibleForTesting
   protected HostFetchResult setupLocalDiskFetch() {
+    return doLocalDiskFetch(true);
+  }
+
+  @VisibleForTesting
+  private HostFetchResult doLocalDiskFetch(boolean failMissing) {
 
     Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
     while (iterator.hasNext()) {
@@ -246,6 +485,7 @@ public class Fetcher implements Callable<FetchResult> {
       FetchedInput fetchedInput = null;
       try {
         TezIndexRecord idxRecord;
+        // for missing files, this will throw an exception
         idxRecord = getTezIndexRecord(srcAttemptId);
 
         fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
@@ -283,8 +523,10 @@ public class Fetcher implements Callable<FetchResult> {
     }
 
     InputAttemptIdentifier[] failedFetches = null;
-    if (remaining.size() > 0) {
+    if (failMissing && remaining.size() > 0) {
       failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+    } else {
+      // nothing needs to be done to requeue remaining entries
     }
     return new HostFetchResult(new FetchResult(host, port, partition, remaining),
         failedFetches, false);
@@ -296,19 +538,24 @@ public class Fetcher implements Callable<FetchResult> {
     TezIndexRecord idxRecord;
     Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
         Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+
     TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
     idxRecord = spillRecord.getIndex(partition);
     return idxRecord;
   }
 
+  private static final String getMapOutputFile(String pathComponent) {
+    return Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+        + pathComponent + Path.SEPARATOR
+        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+  }
+
   @VisibleForTesting
-  protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
-    LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+  protected Path getShuffleInputFileName(String pathComponent, String suffix)
+      throws IOException {
     suffix = suffix != null ? suffix : "";
 
-    String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
-        Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
-
+    String pathFromLocalDir = getMapOutputFile(pathComponent) + suffix;
     return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
   }
 
@@ -350,7 +597,7 @@ public class Fetcher implements Callable<FetchResult> {
     }
   }
 
-  private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
+  private InputAttemptIdentifier[] fetchInputs(DataInputStream input, CachingCallBack callback) {
     FetchedInput fetchedInput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
@@ -392,12 +639,16 @@ public class Fetcher implements Callable<FetchResult> {
         LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
             + ", decomp len: " + decompressedLength);
       }
-
-      // Get the location for the map output - either in-memory or on-disk
       
       // TODO TEZ-957. handle IOException here when Broadcast has better error checking
-      fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
-
+      if (srcAttemptId.isShared() && callback != null) {
+        // force disk if input is being shared
+        fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
+            compressedLength, srcAttemptId);
+      } else {
+        fetchedInput = inputManager.allocate(decompressedLength,
+            compressedLength, srcAttemptId);
+      }
       // TODO NEWTEZ No concept of WAIT at the moment.
       // // Check if we can shuffle *now* ...
       // if (fetchedInput.getType() == FetchedInput.WAIT) {
@@ -427,6 +678,14 @@ public class Fetcher implements Callable<FetchResult> {
             fetchedInput);
       }
 
+      // offer the fetched input for caching
+      if (srcAttemptId.isShared() && callback != null) {
+        // this has to be before the fetchSucceeded, because that goes across
+        // threads into the reader thread and can potentially shutdown this thread
+        // while it is still caching.
+        callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
+      }
+
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
       fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
@@ -434,6 +693,7 @@ public class Fetcher implements Callable<FetchResult> {
 
       // Note successful shuffle
       remaining.remove(srcAttemptId);
+
       // metrics.successFetch();
       return null;
     } catch (IOException ioe) {
@@ -521,12 +781,24 @@ public class Fetcher implements Callable<FetchResult> {
     private Fetcher fetcher;
     private boolean workAssigned = false;
 
-    public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
-                          FetchedInputAllocator inputManager, ApplicationId appId,
-                          SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
-                          boolean localDiskFetchEnabled) {
+    public FetcherBuilder(FetcherCallback fetcherCallback,
+        HttpConnectionParams params, FetchedInputAllocator inputManager,
+        ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+        Configuration conf, boolean localDiskFetchEnabled) {
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+          shuffleSecret, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
+          false);
+    }
+
+    public FetcherBuilder(FetcherCallback fetcherCallback,
+        HttpConnectionParams params, FetchedInputAllocator inputManager,
+        ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+        Configuration conf, RawLocalFileSystem localFs,
+        LocalDirAllocator localDirAllocator, Path lockPath,
+        boolean localDiskFetchEnabled, boolean sharedFetchEnabled) {
       this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
-          shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
+          shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+          lockPath, localDiskFetchEnabled, sharedFetchEnabled);
     }
 
     public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 9d621e8..2ac45d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
 import java.util.List;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -53,6 +54,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
   private final CompressionCodec codec;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
+  private final boolean useSharedInputs;
 
   public ShuffleInputEventHandlerImpl(InputContext inputContext,
                                       ShuffleManager shuffleManager,
@@ -63,6 +65,9 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
     this.codec = codec;
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
+    // this currently relies on a user to enable the flag
+    // expand on idea based on vertex parallelism and num inputs
+    this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
   }
 
   @Override
@@ -109,9 +114,12 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
         shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
         return;
       }
-    } 
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
-        dme.getVersion(), shufflePayload.getPathComponent());
+    }
+
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
+        dme.getTargetIndex(), dme.getVersion(),
+        shufflePayload.getPathComponent(), (useSharedInputs && srcIndex == 0));
+
     if (shufflePayload.hasData()) {
       DataProto dataProto = shufflePayload.getData();
       FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
@@ -119,8 +127,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
       moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
       shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
     } else {
-      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
-              srcAttemptIdentifier, srcIndex);
+      shuffleManager.addKnownInput(shufflePayload.getHost(),
+          shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 4e1a06c..8aa6582 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.text.DecimalFormat;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -43,7 +44,12 @@ import javax.crypto.SecretKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -68,7 +74,9 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -120,6 +128,7 @@ public class ShuffleManager implements FetcherCallback {
   private final SecretKey shuffleSecret;
   private final CompressionCodec codec;
   private final boolean localDiskFetchEnabled;
+  private final boolean sharedFetchEnabled;
   
   private final int ifileBufferSize;
   private final boolean ifileReadAhead;
@@ -140,6 +149,11 @@ public class ShuffleManager implements FetcherCallback {
   private volatile Throwable shuffleError;
   private final HttpConnectionParams httpConnectionParams;
   
+
+  private final LocalDirAllocator localDirAllocator;
+  private final RawLocalFileSystem localFs;
+  private final Path[] localDisks;
+
   // TODO More counters - FetchErrors, speed?
   
   public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
@@ -163,6 +177,8 @@ public class ShuffleManager implements FetcherCallback {
     this.inputManager = inputAllocator;
     this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
         TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+    this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
+        TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
     
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
   
@@ -199,10 +215,23 @@ public class ShuffleManager implements FetcherCallback {
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
     httpConnectionParams =
         ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+
+    this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+
+    this.localDirAllocator = new LocalDirAllocator(
+        TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+
+    this.localDisks = Iterables.toArray(
+        localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
+
+    Arrays.sort(this.localDisks);
+
     LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
         + (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
         + numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
         + ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+        + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
+        + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
         + httpConnectionParams.toString());
   }
 
@@ -301,9 +330,20 @@ public class ShuffleManager implements FetcherCallback {
   }
   
   private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
+
+    Path lockDisk = null;
+
+    if (sharedFetchEnabled) {
+      // pick a single lock disk from the edge name's hashcode + host hashcode
+      final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
+      lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
+    }
+
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
       httpConnectionParams, inputManager, inputContext.getApplicationId(),
-      shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
+        shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+        lockDisk, localDiskFetchEnabled, sharedFetchEnabled);
+
     if (codec != null) {
       fetcherBuilder.setCompressionParameters(codec);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
index d365aa4..20ee665 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -33,6 +33,7 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
 import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
@@ -140,10 +141,26 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
   }
 
   @Override
+  public synchronized FetchedInput allocateType(Type type, long actualSize,
+      long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
+      throws IOException {
+
+    switch (type) {
+    case DISK:
+      return new DiskFetchedInput(actualSize, compressedSize,
+          inputAttemptIdentifier, this, conf, localDirAllocator,
+          fileNameAllocator);
+    default:
+      return allocate(actualSize, compressedSize, inputAttemptIdentifier);
+    }
+  }
+
+  @Override
   public synchronized void fetchComplete(FetchedInput fetchedInput) {
     switch (fetchedInput.getType()) {
     // Not tracking anything here.
     case DISK:
+    case DISK_DIRECT:
     case MEMORY:
       break;
     default: