You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2016/10/05 15:18:41 UTC

tez git commit: TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs (Nathan Roberts via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master ad1fb6216 -> 149db1b48


TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs (Nathan Roberts via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/149db1b4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/149db1b4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/149db1b4

Branch: refs/heads/master
Commit: 149db1b48e8be91455a383d1793ef46e789cfea6
Parents: ad1fb62
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 5 10:18:22 2016 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 5 10:18:22 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 ++
 .../runtime/library/common/sort/impl/IFile.java |   9 ++++
 .../library/common/sort/impl/TestIFile.java     |  54 +++++++++++++++++++
 .../TestIFile_concatenated_compressed.bin       | Bin 0 -> 51913 bytes
 4 files changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c410e1..c85bc8e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
   TEZ-3429. Set reconfigureDoneTime on VertexConfigurationDoneEvent properly.
   TEZ-3000. Fix TestContainerReuse.
   TEZ-3436. Check input and output count before start in MapProcessor.
@@ -114,6 +115,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
   TEZ-3429. Set reconfigureDoneTime on VertexConfigurationDoneEvent properly.
   TEZ-3000. Fix TestContainerReuse.
   TEZ-3436. Check input and output count before start in MapProcessor.
@@ -604,6 +606,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs
   TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
   TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
   TEZ-3326. Display JVM system properties in AM and task logs.

http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index a20182c..f49bc35 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -627,6 +627,15 @@ public class IFile {
       }
       try {
         IOUtils.readFully(in, buffer, 0, buffer.length - IFile.HEADER.length);
+        /*
+         * We've gotten the amount of data we were expecting. Verify the
+         * decompressor has nothing more to offer. This action also forces the
+         * decompressor to read any trailing bytes that weren't critical for
+         * decompression, which is necessary to keep the stream in sync.
+         */
+        if (in.read() >= 0) {
+          throw new IOException("Unexpected extra bytes from input stream");
+        }
       } catch (IOException ioe) {
         if(in != null) {
           try {

http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 24acc40..25e916e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -21,6 +21,8 @@ package org.apache.tez.runtime.library.common.sort.impl;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -243,6 +245,58 @@ public class TestIFile {
     testWithDataBuffer(sortedData);
   }
 
+  //test concatenated zlib input - as in multiple map outputs during shuffle
+  //This specific input is valid but the decompressor can leave lingering
+  // bytes between segments. If the lingering bytes aren't handled correctly,
+  // the stream will get out-of-sync.
+  @Test(timeout = 5000)
+  public void testConcatenatedZlibPadding()
+      throws IOException, URISyntaxException {
+    byte[] bytes;
+    long compTotal = 0;
+    // Known raw and compressed lengths of input
+    long raws[] = { 2392, 102314, 42576, 31432, 25090 };
+    long compressed[] = { 723, 25396, 10926, 8203, 6665 };
+
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
+        Configuration());
+    codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
+
+    URL url = getClass().getClassLoader()
+        .getResource("TestIFile_concatenated_compressed.bin");
+    assertNotEquals("IFileinput file must exist", null, url);
+    Path p = new Path(url.toURI());
+    FSDataInputStream inStream = localFs.open(p);
+
+    for (int i = 0; i < 5; i++) {
+      bytes = new byte[(int) raws[i]];
+      assertEquals("Compressed stream out-of-sync", inStream.getPos(), compTotal);
+      IFile.Reader.readToMemory(bytes, inStream, (int) compressed[i], codec,
+          false, -1);
+      compTotal += compressed[i];
+
+      // Now read the data
+      InMemoryReader inMemReader = new InMemoryReader(null,
+          new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+
+      DataInputBuffer keyIn = new DataInputBuffer();
+      DataInputBuffer valIn = new DataInputBuffer();
+      Deserializer<Text> keyDeserializer;
+      Deserializer<IntWritable> valDeserializer;
+      SerializationFactory serializationFactory =
+          new SerializationFactory(defaultConf);
+      keyDeserializer = serializationFactory.getDeserializer(Text.class);
+      valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
+      keyDeserializer.open(keyIn);
+      valDeserializer.open(valIn);
+
+      while (inMemReader.nextRawKey(keyIn)) {
+        inMemReader.nextRawValue(valIn);
+      }
+    }
+    inStream.close();
+  }
+
   @Test(timeout = 5000)
   //Test InMemoryWriter
   public void testInMemoryWriter() throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/149db1b4/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin
new file mode 100644
index 0000000..395452e
Binary files /dev/null and b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin differ