You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/05/20 17:53:33 UTC
tez git commit: TEZ-3237. Corrupted shuffle transfers to disk are not
detected during transfer (jlowe) (cherry picked from commit
5dd47c6858028ac0187c4fab76b66217bf0d6c56)
Repository: tez
Updated Branches:
refs/heads/branch-0.8 86fdc0a86 -> fa65f3599
TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer (jlowe)
(cherry picked from commit 5dd47c6858028ac0187c4fab76b66217bf0d6c56)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fa65f359
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fa65f359
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fa65f359
Branch: refs/heads/branch-0.8
Commit: fa65f3599e82c901ee43cca1a01df0b875ce0ef1
Parents: 86fdc0a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri May 20 17:53:10 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri May 20 17:53:10 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../library/api/TezRuntimeConfiguration.java | 12 +++++
.../runtime/library/common/shuffle/Fetcher.java | 17 ++++---
.../library/common/shuffle/ShuffleUtils.java | 29 +++++++-----
.../common/shuffle/impl/ShuffleManager.java | 6 ++-
.../orderedgrouped/FetcherOrderedGrouped.java | 9 +++-
.../orderedgrouped/ShuffleScheduler.java | 8 +++-
.../runtime/library/common/sort/impl/IFile.java | 49 ++++++++++++++++++--
.../common/sort/impl/IFileInputStream.java | 7 ++-
.../library/input/OrderedGroupedKVInput.java | 1 +
.../runtime/library/input/UnorderedKVInput.java | 1 +
.../library/common/shuffle/TestFetcher.java | 15 +++---
.../common/shuffle/TestShuffleUtils.java | 27 +++++++++++
.../shuffle/orderedgrouped/TestFetcher.java | 27 +++++++----
.../library/common/sort/impl/TestIFile.java | 40 ++++++++++++++++
15 files changed, 207 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68cf9f6..794d06e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
TEZ-3246. Improve diagnostics when DAG killed by user
TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options.
TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency
@@ -450,6 +451,7 @@ Release 0.7.2: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options.
TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency
TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index caad6ef..08f76f2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -373,6 +373,17 @@ public class TezRuntimeConfiguration {
"shuffle.ssl.enable";
public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT = false;
+ /**
+ * Controls verification of data checksums when fetching data directly to
+ * disk. Enabling verification allows the fetcher to detect corrupted data
+ * and report the failure against the upstream task before the data reaches
+ * the Processor and causes the fetching task to fail.
+ */
+ @ConfigurationProperty(type = "boolean")
+ public static final String TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM =
+ TEZ_RUNTIME_PREFIX + "shuffle.fetch.verify-disk-checksum";
+ public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT = true;
+
@ConfigurationProperty(type = "float")
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX +
"shuffle.fetch.buffer.percent";
@@ -541,6 +552,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 261f2e7..a0705fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -128,6 +128,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private final boolean asyncHttp;
+ private final boolean verifyDiskChecksum;
+
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
@@ -139,8 +141,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
boolean localDiskFetchEnabled,
boolean sharedFetchEnabled,
String localHostname,
- int shufflePort, boolean asyncHttp) {
+ int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
this.asyncHttp = asyncHttp;
+ this.verifyDiskChecksum = verifyDiskChecksum;
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -804,7 +807,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
(host +":" +port), input, compressedLength, decompressedLength, LOG,
- fetchedInput.getInputAttemptIdentifier().toString());
+ fetchedInput.getInputAttemptIdentifier().toString(),
+ ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
fetchedInput);
@@ -971,10 +975,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
- boolean asyncHttp) {
+ boolean asyncHttp, boolean verifyDiskChecksum) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
- false, localHostname, shufflePort, asyncHttp);
+ false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -983,10 +987,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
- String localHostname, int shufflePort, boolean asyncHttp) {
+ String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
- lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
+ lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp,
+ verifyDiskChecksum);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 685503c..ae646ea 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -140,22 +140,27 @@ public class ShuffleUtils {
}
public static void shuffleToDisk(OutputStream output, String hostIdentifier,
- InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier)
- throws IOException {
+ InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier,
+ boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
// Copy data to local-disk
long bytesLeft = compressedLength;
try {
- final int BYTES_TO_READ = 64 * 1024;
- byte[] buf = new byte[BYTES_TO_READ];
- while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
- if (n < 0) {
- throw new IOException("read past end of stream reading "
- + identifier);
+ if (verifyChecksum) {
+ bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength,
+ ifileReadAhead, ifileReadAheadLength);
+ } else {
+ final int BYTES_TO_READ = 64 * 1024;
+ byte[] buf = new byte[BYTES_TO_READ];
+ while (bytesLeft > 0) {
+ int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ if (n < 0) {
+ throw new IOException("read past end of stream reading "
+ + identifier);
+ }
+ output.write(buf, 0, n);
+ bytesLeft -= n;
+ // metrics.inputBytes(n);
}
- output.write(buf, 0, n);
- bytesLeft -= n;
- // metrics.inputBytes(n);
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b82098e..4f7d348 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -136,6 +136,7 @@ public class ShuffleManager implements FetcherCallback {
private final CompressionCodec codec;
private final boolean localDiskFetchEnabled;
private final boolean sharedFetchEnabled;
+ private final boolean verifyDiskChecksum;
private final int ifileBufferSize;
private final boolean ifileReadAhead;
@@ -198,6 +199,9 @@ public class ShuffleManager implements FetcherCallback {
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT);
this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH,
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT);
+ this.verifyDiskChecksum = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -404,7 +408,7 @@ public class ShuffleManager implements FetcherCallback {
httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
- localhostName, shufflePort, asyncHttp);
+ localhostName, shufflePort, asyncHttp, verifyDiskChecksum);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 51bdf68..bcb75d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -59,6 +59,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final Configuration conf;
private final boolean localDiskFetchEnabled;
+ private final boolean verifyDiskChecksum;
private final TezCounter connectionErrs;
private final TezCounter ioErrs;
@@ -125,7 +126,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
String applicationId,
int dagId,
boolean asyncHttp,
- boolean sslShuffle) {
+ boolean sslShuffle,
+ boolean verifyDiskChecksum) {
this.scheduler = scheduler;
this.allocator = allocator;
this.metrics = metrics;
@@ -159,6 +161,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
this.localDiskFetchEnabled = localDiskFetchEnabled;
this.sslShuffle = sslShuffle;
+ this.verifyDiskChecksum = verifyDiskChecksum;
this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id;
}
@@ -504,7 +507,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
} else if (mapOutput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
- input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
+ input, compressedLength, decompressedLength, LOG,
+ mapOutput.getAttemptIdentifier().toString(),
+ ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
} else {
throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
mapOutput.getType());
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 2f6e490..c017efb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -235,6 +235,7 @@ class ShuffleScheduler {
private final float minReqProgressFraction;
private final float maxAllowedFailedFetchFraction;
private final boolean checkFailedFetchSinceLastCompletion;
+ private final boolean verifyDiskChecksum;
private volatile Thread shuffleSchedulerThread = null;
@@ -388,6 +389,10 @@ class ShuffleScheduler {
conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
+ this.verifyDiskChecksum = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
+
/**
* Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would
* be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL.
@@ -1347,7 +1352,8 @@ class ShuffleScheduler {
shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
- connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle);
+ connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle,
+ verifyDiskChecksum);
}
private class FetchFutureCallback implements FutureCallback<Void> {
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 6d8992e..a20182c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -645,6 +646,43 @@ public class IFile {
}
}
+ /**
+ * Read entire IFile content to disk.
+ *
+ * @param out the output stream that will receive the data
+ * @param in the input stream containing the IFile data
+ * @param length the amount of data to read from the input
+ * @return the number of bytes copied
+ * @throws IOException
+ */
+ public static long readToDisk(OutputStream out, InputStream in, long length,
+ boolean ifileReadAhead, int ifileReadAheadLength)
+ throws IOException {
+ final int BYTES_TO_READ = 64 * 1024;
+ byte[] buf = new byte[BYTES_TO_READ];
+
+ // copy the IFile header
+ if (length < HEADER.length) {
+ throw new IOException("Missing IFile header");
+ }
+ IOUtils.readFully(in, buf, 0, HEADER.length);
+ verifyHeaderMagic(buf);
+ out.write(buf, 0, HEADER.length);
+ long bytesLeft = length - HEADER.length;
+ @SuppressWarnings("resource")
+ IFileInputStream ifInput = new IFileInputStream(in, bytesLeft,
+ ifileReadAhead, ifileReadAheadLength);
+ while (bytesLeft > 0) {
+ int n = ifInput.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ if (n < 0) {
+ throw new IOException("read past end of stream");
+ }
+ out.write(buf, 0, n);
+ bytesLeft -= n;
+ }
+ return length - bytesLeft;
+ }
+
public long getLength() {
return fileLength - checksumIn.getSize();
}
@@ -784,14 +822,17 @@ public class IFile {
++numRecordsRead;
}
- public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
- byte[] header = new byte[HEADER.length];
- IOUtils.readFully(in, header, 0, HEADER.length);
-
+ private static void verifyHeaderMagic(byte[] header) throws IOException {
if (!(header[0] == 'T' && header[1] == 'I'
&& header[2] == 'F')) {
throw new IOException("Not a valid ifile header");
}
+ }
+
+ public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
+ byte[] header = new byte[HEADER.length];
+ IOUtils.readFully(in, header, 0, HEADER.length);
+ verifyHeaderMagic(header);
return (header[3] == 1);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index d116242..c5853d4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -125,7 +125,7 @@ public class IFileInputStream extends InputStream {
if (curReadahead != null) {
curReadahead.cancel();
}
- if (currentOffset < dataLength) {
+ if (currentOffset < dataLength && !disableChecksumValidation) {
byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
while (currentOffset < dataLength) {
@@ -300,7 +300,10 @@ public class IFileInputStream extends InputStream {
return result;
}
- void disableChecksumValidation() {
+ /**
+ * Disable checksum validation when reading the stream
+ */
+ public void disableChecksumValidation() {
disableChecksumValidation = true;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 5e367cf..9a2a23e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -354,6 +354,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index dbbe23ff..ec9a191 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -263,6 +263,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 0aa112e..bd0ea0f 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -71,7 +71,7 @@ public class TestFetcher {
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
- PORT, false);
+ PORT, false, true);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -89,7 +89,7 @@ public class TestFetcher {
// when enabled and hostname does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
- PORT, false);
+ PORT, false, true);
builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -104,7 +104,8 @@ public class TestFetcher {
// when enabled and port does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false);
+ ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+ PORT, false, true);
builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -121,7 +122,7 @@ public class TestFetcher {
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
- PORT, false);
+ PORT, false, true);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -154,7 +155,8 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+ false, true);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -272,7 +274,8 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false);
+ ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
+ false, true);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 4ac1bca..c542030 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -23,6 +24,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -33,9 +35,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Random;
@@ -283,4 +288,26 @@ public class TestShuffleUtils {
Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
}
}
+
+ @Test
+ public void testShuffleToDiskChecksum() throws Exception {
+ // verify sending a stream of zeroes without checksum validation
+ // does not trigger an exception
+ byte[] bogusData = new byte[1000];
+ Arrays.fill(bogusData, (byte) 0);
+ ByteArrayInputStream in = new ByteArrayInputStream(bogusData);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ShuffleUtils.shuffleToDisk(baos, "somehost", in,
+ bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false);
+ Assert.assertArrayEquals(bogusData, baos.toByteArray());
+
+ // verify sending same stream of zeroes with validation generates an exception
+ in.reset();
+ try {
+ ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in,
+ bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true);
+ Assert.fail("shuffle was supposed to throw!");
+ } catch (IOException e) {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 89d35f4..310f1b2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -127,7 +127,8 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
fetcher.call();
verify(scheduler).getMapsForHost(mapHost);
@@ -155,7 +156,8 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
// when local mode is enabled and host and port matches use local fetch
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -172,7 +174,8 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false ,true);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -187,7 +190,8 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -201,7 +205,8 @@ public class TestFetcher {
fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -225,7 +230,8 @@ public class TestFetcher {
MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -369,7 +375,8 @@ public class TestFetcher {
final MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
@@ -460,7 +467,8 @@ public class TestFetcher {
false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ true, false, true);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>();
final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
@@ -527,7 +535,8 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
+ false, false, true);
fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts)));
Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size());
Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator();
http://git-wip-us.apache.org/repos/asf/tez/blob/fa65f359/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 1c269bd..24acc40 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -19,8 +19,10 @@
package org.apache.tez.runtime.library.common.sort.impl;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
@@ -28,6 +30,7 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -375,6 +378,43 @@ public class TestIFile {
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
}
+ @Test(timeout = 20000)
+ public void testReadToDisk() throws IOException {
+ // verify sending a stream of zeroes generates an error
+ byte[] zeroData = new byte[1000];
+ Arrays.fill(zeroData, (byte) 0);
+ ByteArrayInputStream in = new ByteArrayInputStream(zeroData);
+ try {
+ IFile.Reader.readToDisk(new ByteArrayOutputStream(), in, zeroData.length, false, 0);
+ fail("Exception should have been thrown");
+ } catch (IOException e) {
+ }
+
+ // verify sending same stream of zeroes with a valid IFile header still
+ // generates an error
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(IFile.HEADER);
+ baos.write(zeroData);
+ try {
+ IFile.Reader.readToDisk(new ByteArrayOutputStream(),
+ new ByteArrayInputStream(baos.toByteArray()), zeroData.length, false, 0);
+ fail("Exception should have been thrown");
+ } catch (IOException e) {
+ assertTrue(e instanceof ChecksumException);
+ }
+
+ // verify valid data is copied properly
+ List<KVPair> data = KVDataGen.generateTestData(true, 0);
+ Writer writer = writeTestFile(false, false, data, codec);
+ baos.reset();
+ IFile.Reader.readToDisk(baos, localFs.open(outputPath), writer.getCompressedLength(),
+ false, 0);
+ byte[] diskData = baos.toByteArray();
+ Reader reader = new Reader(new ByteArrayInputStream(diskData), diskData.length,
+ codec, null, null, false, 0, 1024);
+ verifyData(reader, data);
+ reader.close();
+ }
/**
* Test different options (RLE, repeat keys, compression) on reader/writer