You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/10/05 15:18:41 UTC
tez git commit: TEZ-3440. Shuffling to memory can get out-of-sync
when fetching multiple compressed map outputs (Nathan Roberts via jeagles)
Repository: tez
Updated Branches:
refs/heads/master ad1fb6216 -> 149db1b48
TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs (Nathan Roberts via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/149db1b4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/149db1b4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/149db1b4
Branch: refs/heads/master
Commit: 149db1b48e8be91455a383d1793ef46e789cfea6
Parents: ad1fb62
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 5 10:18:22 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 5 10:18:22 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../runtime/library/common/sort/impl/IFile.java | 9 ++++
.../library/common/sort/impl/TestIFile.java | 54 +++++++++++++++++++
.../TestIFile_concatenated_compressed.bin | Bin 0 -> 51913 bytes
4 files changed, 66 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c410e1..c85bc8e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
TEZ-3429. Set reconfigureDoneTime on VertexConfigurationDoneEvent properly.
TEZ-3000. Fix TestContainerReuse.
TEZ-3436. Check input and output count before start in MapProcessor.
@@ -114,6 +115,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
TEZ-3429. Set reconfigureDoneTime on VertexConfigurationDoneEvent properly.
TEZ-3000. Fix TestContainerReuse.
TEZ-3436. Check input and output count before start in MapProcessor.
@@ -604,6 +606,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
TEZ-3326. Display JVM system properties in AM and task logs.
http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index a20182c..f49bc35 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -627,6 +627,15 @@ public class IFile {
}
try {
IOUtils.readFully(in, buffer, 0, buffer.length - IFile.HEADER.length);
+ /*
+ * We've gotten the amount of data we were expecting. Verify the
+ * decompressor has nothing more to offer. This action also forces the
+ * decompressor to read any trailing bytes that weren't critical for
+ * decompression, which is necessary to keep the stream in sync.
+ */
+ if (in.read() >= 0) {
+ throw new IOException("Unexpected extra bytes from input stream");
+ }
} catch (IOException ioe) {
if(in != null) {
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 24acc40..25e916e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -21,6 +21,8 @@ package org.apache.tez.runtime.library.common.sort.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
@@ -243,6 +245,58 @@ public class TestIFile {
testWithDataBuffer(sortedData);
}
+ //test concatenated zlib input - as in multiple map outputs during shuffle
+ //This specific input is valid but the decompressor can leave lingering
+ // bytes between segments. If the lingering bytes aren't handled correctly,
+ // the stream will get out-of-sync.
+ @Test(timeout = 5000)
+ public void testConcatenatedZlibPadding()
+ throws IOException, URISyntaxException {
+ byte[] bytes;
+ long compTotal = 0;
+ // Known raw and compressed lengths of input
+ long raws[] = { 2392, 102314, 42576, 31432, 25090 };
+ long compressed[] = { 723, 25396, 10926, 8203, 6665 };
+
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
+ Configuration());
+ codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
+
+ URL url = getClass().getClassLoader()
+ .getResource("TestIFile_concatenated_compressed.bin");
+ assertNotEquals("IFileinput file must exist", null, url);
+ Path p = new Path(url.toURI());
+ FSDataInputStream inStream = localFs.open(p);
+
+ for (int i = 0; i < 5; i++) {
+ bytes = new byte[(int) raws[i]];
+ assertEquals("Compressed stream out-of-sync", inStream.getPos(), compTotal);
+ IFile.Reader.readToMemory(bytes, inStream, (int) compressed[i], codec,
+ false, -1);
+ compTotal += compressed[i];
+
+ // Now read the data
+ InMemoryReader inMemReader = new InMemoryReader(null,
+ new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+
+ DataInputBuffer keyIn = new DataInputBuffer();
+ DataInputBuffer valIn = new DataInputBuffer();
+ Deserializer<Text> keyDeserializer;
+ Deserializer<IntWritable> valDeserializer;
+ SerializationFactory serializationFactory =
+ new SerializationFactory(defaultConf);
+ keyDeserializer = serializationFactory.getDeserializer(Text.class);
+ valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
+ keyDeserializer.open(keyIn);
+ valDeserializer.open(valIn);
+
+ while (inMemReader.nextRawKey(keyIn)) {
+ inMemReader.nextRawValue(valIn);
+ }
+ }
+ inStream.close();
+ }
+
@Test(timeout = 5000)
//Test InMemoryWriter
public void testInMemoryWriter() throws IOException {
http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin
new file mode 100644
index 0000000..395452e
Binary files /dev/null and b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin differ