You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/04 23:40:59 UTC

[2/3] incubator-beam git commit: [BEAM-167] Fix custom source gzip input to read concatenated gzip files

[BEAM-167] Fix custom source gzip input to read concatenated gzip files

This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/abc397f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/abc397f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/abc397f9

Branch: refs/heads/master
Commit: abc397f9b9851599eff7f3e1ec5b5343005a0a94
Parents: fd049b5
Author: kirpichov <ki...@google.com>
Authored: Mon Apr 4 13:31:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 4 14:40:07 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/CompressedSource.java |  2 +-
 .../dataflow/sdk/io/CompressedSourceTest.java   | 40 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
index 4e3e9ca..15e6e29 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
@@ -122,7 +122,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
           byte zero = 0x00;
           int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
           if (header == GZIPInputStream.GZIP_MAGIC) {
-            return Channels.newChannel(new GzipCompressorInputStream(stream));
+            return Channels.newChannel(new GzipCompressorInputStream(stream, true));
           }
         }
         return Channels.newChannel(stream);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 2dcddb4..f63a128 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -47,16 +47,19 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import java.util.zip.GZIPOutputStream;
 
 import javax.annotation.Nullable;
 
@@ -98,6 +101,43 @@ public class CompressedSourceTest {
     runReadTest(input, CompressionMode.GZIP);
   }
 
+  private static byte[] compressGzip(byte[] input) throws IOException {
+    ByteArrayOutputStream res = new ByteArrayOutputStream();
+    try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
+      gzipStream.write(input);
+    }
+    return res.toByteArray();
+  }
+
+  private static byte[] concat(byte[] first, byte[] second) {
+    byte[] res = new byte[first.length + second.length];
+    System.arraycopy(first, 0, res, 0, first.length);
+    System.arraycopy(second, 0, res, first.length, second.length);
+    return res;
+  }
+
+  @Test
+  public void testReadConcatenatedGzip() throws IOException {
+    byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
+    byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
+    byte[] expected = concat(header, body);
+    byte[] totalGz = concat(compressGzip(header), compressGzip(body));
+    File tmpFile = tmpFolder.newFile();
+    try (FileOutputStream os = new FileOutputStream(tmpFile)) {
+      os.write(totalGz);
+    }
+
+    Pipeline p = TestPipeline.create();
+
+    CompressedSource<Byte> source =
+        CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
+            .withDecompression(CompressionMode.GZIP);
+    PCollection<Byte> output = p.apply(Read.from(source));
+
+    DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+    p.run();
+  }
+
   /**
    * Test reading empty input with bzip2.
    */