You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:10 UTC
[25/25] git commit: TEZ-1157. Optimize broadcast shuffle to download
data only once per host. (gopalv)
TEZ-1157. Optimize broadcast shuffle to download data only once per host. (gopalv)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/625450cf
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/625450cf
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/625450cf
Branch: refs/heads/TEZ-8
Commit: 625450cf11454fa9697a902ba70367de00cdc170
Parents: e328055
Author: Gopal V <go...@apache.org>
Authored: Wed Sep 17 20:53:11 2014 -0700
Committer: Gopal V <go...@apache.org>
Committed: Wed Sep 17 20:53:11 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../library/api/TezRuntimeConfiguration.java | 12 +
.../library/common/InputAttemptIdentifier.java | 26 +-
.../shuffle/common/DiskFetchedInput.java | 7 +
.../library/shuffle/common/FetchedInput.java | 3 +-
.../shuffle/common/FetchedInputAllocator.java | 3 +
.../runtime/library/shuffle/common/Fetcher.java | 318 +++++++++++++++++--
.../impl/ShuffleInputEventHandlerImpl.java | 18 +-
.../shuffle/common/impl/ShuffleManager.java | 42 ++-
.../impl/SimpleFetchedInputAllocator.java | 17 +
10 files changed, 412 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd5569c..5e2c2cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,8 +19,8 @@ ALL CHANGES:
TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
TEZ-1524. Resolve user group information only if ACLs are enabled.
TEZ-1581. GroupByOrderByMRRTest no longer functional.
- TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
- of DAG submission
+ TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless of DAG submission.
+ TEZ-1157. Optimize broadcast shuffle to download data only once per host.
Release 0.5.1: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 077ce8e..cb61109 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -280,6 +280,17 @@ public class TezRuntimeConfiguration {
*/
public static final boolean TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT = false;
+ /**
+ * Share data fetched between tasks running on the same host if applicable
+ */
+ public static final String TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH = TEZ_RUNTIME_PREFIX
+ + "optimize.shared.fetch";
+
+ /**
+ * shared mode bypassing the http fetch is not enabled by default till we have unit tests in.
+ */
+ public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false;
+
// TODO TEZ-1233 - allow this property to be set per vertex
// TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
@@ -333,6 +344,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+ tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
index 7c8a23b..9987d26 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java
@@ -29,28 +29,38 @@ public class InputAttemptIdentifier {
private final InputIdentifier inputIdentifier;
private final int attemptNumber;
- private String pathComponent;
-
+ private final String pathComponent;
+ private final boolean shared;
+
public static final String PATH_PREFIX = "attempt";
-
+
public InputAttemptIdentifier(int inputIndex, int attemptNumber) {
this(new InputIdentifier(inputIndex), attemptNumber, null);
}
-
+
public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+ this(inputIdentifier, attemptNumber, pathComponent, false);
+ }
+
+ public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent, boolean shared) {
this.inputIdentifier = inputIdentifier;
this.attemptNumber = attemptNumber;
this.pathComponent = pathComponent;
+ this.shared = shared;
if (pathComponent != null && !pathComponent.startsWith(PATH_PREFIX)) {
throw new TezUncheckedException(
"Path component must start with: " + PATH_PREFIX + " " + this);
}
}
-
+
public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
}
+ public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent, boolean shared) {
+ this(new InputIdentifier(taskIndex), attemptNumber, pathComponent, shared);
+ }
+
public InputIdentifier getInputIdentifier() {
return this.inputIdentifier;
}
@@ -63,7 +73,11 @@ public class InputAttemptIdentifier {
return pathComponent;
}
- // PathComponent does not need to be part of the hashCode and equals computation.
+ public boolean isShared() {
+ return this.shared;
+ }
+
+ // PathComponent & shared does not need to be part of the hashCode and equals computation.
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 1d26c6e..b0b911b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -65,6 +65,13 @@ public class DiskFetchedInput extends FetchedInput {
public InputStream getInputStream() throws IOException {
return localFS.open(outputPath);
}
+
+ public final Path getInputPath() {
+ if (state == State.COMMITTED) {
+ return this.outputPath;
+ }
+ return this.tmpOutputPath;
+ }
@Override
public void commit() throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
index fde19b7..0a83dc9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -110,7 +110,8 @@ public abstract class FetchedInput {
/**
* Return an input stream to be used to read the previously fetched data.
- * Users are expected to close the InputStream when they're done
+ * All calls to getInputStream() produce new reset streams for reading.
+ * Users are expected to close the InputStream when they're done.
*/
public abstract InputStream getInputStream() throws IOException;
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
index 1707ab7..288df6d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -21,11 +21,14 @@ package org.apache.tez.runtime.library.shuffle.common;
import java.io.IOException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
public interface FetchedInputAllocator {
public FetchedInput allocate(long actualSize, long compresedSize,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
+ public FetchedInput allocateType(Type type, long actualSize, long compresedSize,
+ InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 9cb8617..e25124b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -19,10 +19,17 @@
package org.apache.tez.runtime.library.shuffle.common;
import java.io.DataInputStream;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
import java.net.URL;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -34,12 +41,19 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -54,6 +68,7 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
/**
* Responsible for fetching inputs served by the ShuffleHandler for a single
@@ -101,10 +116,22 @@ public class Fetcher implements Callable<FetchResult> {
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
+ private final boolean sharedFetchEnabled;
+
+ private final LocalDirAllocator localDirAllocator;
+ private final Path lockPath;
+ private final RawLocalFileSystem localFs;
+
+ private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
- FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
- String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled) {
+ FetchedInputAllocator inputManager, ApplicationId appId,
+ SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
+ RawLocalFileSystem localFs,
+ LocalDirAllocator localDirAllocator,
+ Path lockPath,
+ boolean localDiskFetchEnabled,
+ boolean sharedFetchEnabled) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.shuffleSecret = shuffleSecret;
@@ -114,19 +141,42 @@ public class Fetcher implements Callable<FetchResult> {
this.conf = conf;
this.localDiskFetchEnabled = localDiskFetchEnabled;
+ this.sharedFetchEnabled = sharedFetchEnabled;
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
- this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
+ this.logIdentifier = " fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
+
+ this.localFs = localFs;
+ this.localDirAllocator = localDirAllocator;
+ this.lockPath = lockPath;
+
+ try {
+ if (this.sharedFetchEnabled) {
+ this.localFs.mkdirs(this.lockPath);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error initializing local dirs for shared transfer " + e);
+ }
}
@Override
public FetchResult call() throws Exception {
+ boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
+
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, srcAttempts);
}
for (InputAttemptIdentifier in : srcAttempts) {
pathToAttemptMap.put(in.getPathComponent(), in);
+ // do only if all of them are shared fetches
+ multiplex &= in.isShared();
+ }
+
+ if (multiplex) {
+ Preconditions.checkArgument(partition == 0,
+ "Shared fetches cannot be done for partitioned input"
+ + "- partition is non-zero (%d)", partition);
}
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
@@ -136,7 +186,9 @@ public class Fetcher implements Callable<FetchResult> {
if (localDiskFetchEnabled &&
host.equals(System.getenv(ApplicationConstants.Environment.NM_HOST.toString()))) {
hostFetchResult = setupLocalDiskFetch();
- } else {
+ } else if (multiplex) {
+ hostFetchResult = doSharedFetch();
+ } else{
hostFetchResult = doHttpFetch();
}
@@ -151,15 +203,197 @@ public class Fetcher implements Callable<FetchResult> {
// Sanity check
if (hostFetchResult.failedInputs == null && !remaining.isEmpty()) {
- throw new IOException("server didn't return all expected map outputs: "
- + remaining.size() + " left.");
+ if (!multiplex) {
+ throw new IOException("server didn't return all expected map outputs: "
+ + remaining.size() + " left.");
+ } else {
+ LOG.info("Shared fetch failed to return " + remaining.size() + " inputs on this try");
+ }
}
return hostFetchResult.fetchResult;
}
+ private final class CachingCallBack {
+ // this is a closure object wrapping this in an inner class
+ public void cache(String host,
+ InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput,
+ long compressedLength, long decompressedLength) {
+ try {
+ // this breaks badly on partitioned input - please use responsibly
+ Preconditions.checkArgument(partition == 0, "Partition == 0");
+ final String tmpSuffix = "." + System.currentTimeMillis() + ".tmp";
+ final String finalOutput = getMapOutputFile(srcAttemptId.getPathComponent());
+ final Path outputPath = localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, conf);
+ final TezSpillRecord spillRec = new TezSpillRecord(1);
+ final TezIndexRecord indexRec;
+ Path tmpIndex = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING+tmpSuffix);
+
+ if (localFs.exists(tmpIndex)) {
+ LOG.warn("Found duplicate instance of input index file " + tmpIndex);
+ return;
+ }
+
+ Path tmpPath = null;
+
+ switch (fetchedInput.getType()) {
+ case DISK: {
+ DiskFetchedInput input = (DiskFetchedInput) fetchedInput;
+ indexRec = new TezIndexRecord(0, decompressedLength, compressedLength);
+ localFs.mkdirs(outputPath.getParent());
+ // avoid pit-falls of speculation
+ tmpPath = outputPath.suffix(tmpSuffix);
+ // JDK7 - TODO: use Files implementation to speed up this process
+ localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
+ // rename is atomic
+ boolean renamed = localFs.rename(tmpPath, outputPath);
+ if(!renamed) {
+ LOG.warn("Could not rename to cached file name " + outputPath);
+ localFs.delete(tmpPath, false);
+ return;
+ }
+ }
+ break;
+ default:
+ LOG.warn("Incorrect use of CachingCallback for " + srcAttemptId);
+ return;
+ }
+
+ spillRec.putIndex(indexRec, 0);
+ spillRec.writeToFile(tmpIndex, conf);
+ // everything went well so far - rename it
+ boolean renamed = localFs.rename(tmpIndex, outputPath
+ .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
+ if (!renamed) {
+ localFs.delete(tmpIndex, false);
+ if (outputPath != null) {
+ // invariant: outputPath was renamed from tmpPath
+ localFs.delete(outputPath, false);
+ }
+ LOG.warn("Could not rename the index file to "
+ + outputPath
+ .suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING));
+ return;
+ }
+ } catch (IOException ioe) {
+ // do mostly nothing
+ LOG.warn("Cache threw an error " + ioe);
+ }
+ }
+ }
+
+ private int findInputs() throws IOException {
+ int k = 0;
+ for (InputAttemptIdentifier src : srcAttempts) {
+ try {
+ if (getShuffleInputFileName(src.getPathComponent(),
+ Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING) != null) {
+ k++;
+ }
+ } catch (DiskErrorException de) {
+ // missing file, ignore
+ }
+ }
+ return k;
+ }
+
+ private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
+ File lockFile = localFs.pathToFile(new Path(lockPath, host + ".lock"));
+
+ final boolean created = lockFile.createNewFile();
+
+ if (created == false && !lockFile.exists()) {
+ // bail-out cleanly
+ return null;
+ }
+
+ // invariant - file created (winner writes to this file)
+ // caveat: closing lockChannel does close the file (do not double close)
+ // JDK7 - TODO: use AsynchronousFileChannel instead of RandomAccessFile
+ FileChannel lockChannel = new RandomAccessFile(lockFile, "rws")
+ .getChannel();
+ FileLock xlock = null;
+
+ xlock = lockChannel.tryLock(0, Long.MAX_VALUE, false);
+ if (xlock != null) {
+ return xlock;
+ }
+ lockChannel.close();
+ return null;
+ }
+
+ private void releaseLock(FileLock lock) throws IOException {
+ if (lock != null && lock.isValid()) {
+ FileChannel lockChannel = lock.channel();
+ lock.release();
+ lockChannel.close();
+ }
+ }
+
+ protected HostFetchResult doSharedFetch() throws IOException {
+ int inputs = findInputs();
+
+ if (inputs == srcAttempts.size()) {
+ if (isDebugEnabled) {
+ LOG.debug("Using the copies found locally");
+ }
+ return doLocalDiskFetch(true);
+ }
+
+ if (inputs > 0) {
+ if (isDebugEnabled) {
+ LOG.debug("Found " + input
+ + " local fetches right now, using them first");
+ }
+ return doLocalDiskFetch(false);
+ }
+
+ FileLock lock = null;
+ try {
+ lock = getLock();
+ if (lock == null) {
+ // re-queue until we get a lock
+ LOG.info("Requeuing " + host + ":" + port
+ + " downloads because we didn't get a lock");
+ return new HostFetchResult(new FetchResult(host, port, partition,
+ remaining), null, false);
+ } else {
+ if (findInputs() == srcAttempts.size()) {
+ // double checked after lock
+ releaseLock(lock);
+ lock = null;
+ return doLocalDiskFetch(true);
+ }
+ // cache data if possible
+ return doHttpFetch(new CachingCallBack());
+ }
+ } catch (OverlappingFileLockException jvmCrossLock) {
+ // fall back to HTTP fetch below
+ LOG.warn("Double locking detected for " + host);
+ } catch (InterruptedException sleepInterrupted) {
+ // fall back to HTTP fetch below
+ LOG.warn("Lock was interrupted for " + host);
+ } finally {
+ releaseLock(lock);
+ }
+
+ if (isShutDown.get()) {
+ // if any exception was due to shut-down don't bother firing any more
+ // requests
+ return new HostFetchResult(new FetchResult(host, port, partition,
+ remaining), null, false);
+ }
+ // no more caching
+ return doHttpFetch();
+ }
+
@VisibleForTesting
protected HostFetchResult doHttpFetch() {
+ return doHttpFetch(null);
+ }
+
+ @VisibleForTesting
+ protected HostFetchResult doHttpFetch(CachingCallBack callback) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
@@ -227,7 +461,7 @@ public class Fetcher implements Callable<FetchResult> {
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedInputs = null;
while (!remaining.isEmpty() && failedInputs == null) {
- failedInputs = fetchInputs(input);
+ failedInputs = fetchInputs(input, callback);
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining), failedInputs,
@@ -236,6 +470,11 @@ public class Fetcher implements Callable<FetchResult> {
@VisibleForTesting
protected HostFetchResult setupLocalDiskFetch() {
+ return doLocalDiskFetch(true);
+ }
+
+ @VisibleForTesting
+ private HostFetchResult doLocalDiskFetch(boolean failMissing) {
Iterator<InputAttemptIdentifier> iterator = remaining.iterator();
while (iterator.hasNext()) {
@@ -246,6 +485,7 @@ public class Fetcher implements Callable<FetchResult> {
FetchedInput fetchedInput = null;
try {
TezIndexRecord idxRecord;
+ // for missing files, this will throw an exception
idxRecord = getTezIndexRecord(srcAttemptId);
fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(),
@@ -283,8 +523,10 @@ public class Fetcher implements Callable<FetchResult> {
}
InputAttemptIdentifier[] failedFetches = null;
- if (remaining.size() > 0) {
+ if (failMissing && remaining.size() > 0) {
failedFetches = remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
+ } else {
+ // nothing needs to be done to requeue remaining entries
}
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
failedFetches, false);
@@ -296,19 +538,24 @@ public class Fetcher implements Callable<FetchResult> {
TezIndexRecord idxRecord;
Path indexFile = getShuffleInputFileName(srcAttemptId.getPathComponent(),
Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
idxRecord = spillRecord.getIndex(partition);
return idxRecord;
}
+ private static final String getMapOutputFile(String pathComponent) {
+ return Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+ + pathComponent + Path.SEPARATOR
+ + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
+ }
+
@VisibleForTesting
- protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
- LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ protected Path getShuffleInputFileName(String pathComponent, String suffix)
+ throws IOException {
suffix = suffix != null ? suffix : "";
- String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + pathComponent +
- Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix;
-
+ String pathFromLocalDir = getMapOutputFile(pathComponent) + suffix;
return localDirAllocator.getLocalPathToRead(pathFromLocalDir, conf);
}
@@ -350,7 +597,7 @@ public class Fetcher implements Callable<FetchResult> {
}
}
- private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
+ private InputAttemptIdentifier[] fetchInputs(DataInputStream input, CachingCallBack callback) {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
@@ -392,12 +639,16 @@ public class Fetcher implements Callable<FetchResult> {
LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
+ ", decomp len: " + decompressedLength);
}
-
- // Get the location for the map output - either in-memory or on-disk
// TODO TEZ-957. handle IOException here when Broadcast has better error checking
- fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
-
+ if (srcAttemptId.isShared() && callback != null) {
+ // force disk if input is being shared
+ fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
+ compressedLength, srcAttemptId);
+ } else {
+ fetchedInput = inputManager.allocate(decompressedLength,
+ compressedLength, srcAttemptId);
+ }
// TODO NEWTEZ No concept of WAIT at the moment.
// // Check if we can shuffle *now* ...
// if (fetchedInput.getType() == FetchedInput.WAIT) {
@@ -427,6 +678,14 @@ public class Fetcher implements Callable<FetchResult> {
fetchedInput);
}
+ // offer the fetched input for caching
+ if (srcAttemptId.isShared() && callback != null) {
+ // this has to be before the fetchSucceeded, because that goes across
+ // threads into the reader thread and can potentially shutdown this thread
+ // while it is still caching.
+ callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
+ }
+
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
@@ -434,6 +693,7 @@ public class Fetcher implements Callable<FetchResult> {
// Note successful shuffle
remaining.remove(srcAttemptId);
+
// metrics.successFetch();
return null;
} catch (IOException ioe) {
@@ -521,12 +781,24 @@ public class Fetcher implements Callable<FetchResult> {
private Fetcher fetcher;
private boolean workAssigned = false;
- public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params,
- FetchedInputAllocator inputManager, ApplicationId appId,
- SecretKey shuffleSecret, String srcNameTrimmed, Configuration conf,
- boolean localDiskFetchEnabled) {
+ public FetcherBuilder(FetcherCallback fetcherCallback,
+ HttpConnectionParams params, FetchedInputAllocator inputManager,
+ ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+ Configuration conf, boolean localDiskFetchEnabled) {
+ this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+ shuffleSecret, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
+ false);
+ }
+
+ public FetcherBuilder(FetcherCallback fetcherCallback,
+ HttpConnectionParams params, FetchedInputAllocator inputManager,
+ ApplicationId appId, SecretKey shuffleSecret, String srcNameTrimmed,
+ Configuration conf, RawLocalFileSystem localFs,
+ LocalDirAllocator localDirAllocator, Path lockPath,
+ boolean localDiskFetchEnabled, boolean sharedFetchEnabled) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
- shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
+ shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+ lockPath, localDiskFetchEnabled, sharedFetchEnabled);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
index 9d621e8..2ac45d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
import java.util.List;
import com.google.protobuf.ByteString;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -53,6 +54,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
private final CompressionCodec codec;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
+ private final boolean useSharedInputs;
public ShuffleInputEventHandlerImpl(InputContext inputContext,
ShuffleManager shuffleManager,
@@ -63,6 +65,9 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
this.codec = codec;
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
+ // this currently relies on a user to enable the flag
+ // expand on idea based on vertex parallelism and num inputs
+ this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0);
}
@Override
@@ -109,9 +114,12 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
return;
}
- }
- InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(),
- dme.getVersion(), shufflePayload.getPathComponent());
+ }
+
+ InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
+ dme.getTargetIndex(), dme.getVersion(),
+ shufflePayload.getPathComponent(), (useSharedInputs && srcIndex == 0));
+
if (shufflePayload.hasData()) {
DataProto dataProto = shufflePayload.getData();
FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),
@@ -119,8 +127,8 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
} else {
- shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(),
- srcAttemptIdentifier, srcIndex);
+ shuffleManager.addKnownInput(shufflePayload.getHost(),
+ shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 4e1a06c..8aa6582 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DecimalFormat;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -43,7 +44,12 @@ import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -68,7 +74,9 @@ import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -120,6 +128,7 @@ public class ShuffleManager implements FetcherCallback {
private final SecretKey shuffleSecret;
private final CompressionCodec codec;
private final boolean localDiskFetchEnabled;
+ private final boolean sharedFetchEnabled;
private final int ifileBufferSize;
private final boolean ifileReadAhead;
@@ -140,6 +149,11 @@ public class ShuffleManager implements FetcherCallback {
private volatile Throwable shuffleError;
private final HttpConnectionParams httpConnectionParams;
+
+ private final LocalDirAllocator localDirAllocator;
+ private final RawLocalFileSystem localFs;
+ private final Path[] localDisks;
+
// TODO More counters - FetchErrors, speed?
public ShuffleManager(InputContext inputContext, Configuration conf, int numInputs,
@@ -163,6 +177,8 @@ public class ShuffleManager implements FetcherCallback {
this.inputManager = inputAllocator;
this.localDiskFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
+ this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
+ TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
@@ -199,10 +215,23 @@ public class ShuffleManager implements FetcherCallback {
.getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
httpConnectionParams =
ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+
+ this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
+
+ this.localDirAllocator = new LocalDirAllocator(
+ TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+
+ this.localDisks = Iterables.toArray(
+ localDirAllocator.getAllLocalPathsToRead(".", conf), Path.class);
+
+ Arrays.sort(this.localDisks);
+
LOG.info(this.getClass().getSimpleName() + " : numInputs=" + numInputs + ", compressionCodec="
+ (codec == null ? "NoCompressionCodec" : codec.getClass().getName()) + ", numFetchers="
+ numFetchers + ", ifileBufferSize=" + ifileBufferSize + ", ifileReadAheadEnabled="
+ ifileReadAhead + ", ifileReadAheadLength=" + ifileReadAheadLength +", "
+ + "localDiskFetchEnabled=" + localDiskFetchEnabled + ", "
+ + "sharedFetchEnabled=" + sharedFetchEnabled + ", "
+ httpConnectionParams.toString());
}
@@ -301,9 +330,20 @@ public class ShuffleManager implements FetcherCallback {
}
private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
+
+ Path lockDisk = null;
+
+ if (sharedFetchEnabled) {
+ // pick a single lock disk from the edge name's hashcode + host hashcode
+ final int h = Math.abs(Objects.hashCode(this.srcNameTrimmed, inputHost.getHost()));
+ lockDisk = new Path(this.localDisks[h % this.localDisks.length], "locks");
+ }
+
FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
httpConnectionParams, inputManager, inputContext.getApplicationId(),
- shuffleSecret, srcNameTrimmed, conf, localDiskFetchEnabled);
+ shuffleSecret, srcNameTrimmed, conf, localFs, localDirAllocator,
+ lockDisk, localDiskFetchEnabled, sharedFetchEnabled);
+
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/625450cf/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
index d365aa4..20ee665 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/SimpleFetchedInputAllocator.java
@@ -33,6 +33,7 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
@@ -140,10 +141,26 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator,
}
@Override
+ public synchronized FetchedInput allocateType(Type type, long actualSize,
+ long compressedSize, InputAttemptIdentifier inputAttemptIdentifier)
+ throws IOException {
+
+ switch (type) {
+ case DISK:
+ return new DiskFetchedInput(actualSize, compressedSize,
+ inputAttemptIdentifier, this, conf, localDirAllocator,
+ fileNameAllocator);
+ default:
+ return allocate(actualSize, compressedSize, inputAttemptIdentifier);
+ }
+ }
+
+ @Override
public synchronized void fetchComplete(FetchedInput fetchedInput) {
switch (fetchedInput.getType()) {
// Not tracking anything here.
case DISK:
+ case DISK_DIRECT:
case MEMORY:
break;
default: