You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/08/02 18:47:54 UTC

[1/2] beam git commit: [BEAM-2708] Configure BZIP2 to read all "streams"

Repository: beam
Updated Branches:
  refs/heads/master 339976c9f -> 9582840bc


[BEAM-2708] Configure BZIP2 to read all "streams"

Without this, CompressionMode.BZIP2 only supports "standard" bz2 files
containing a single stream. With this change, BZIP2 also supports bz2 files
containing multiple streams, such as those produced by pbzip2.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a919f8e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a919f8e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a919f8e7

Branch: refs/heads/master
Commit: a919f8e7769cb4e10e380553ee9ce7feb6ab369f
Parents: 339976c
Author: bchambers <bc...@google.com>
Authored: Tue Aug 1 14:11:24 2017 -0700
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 2 20:47:11 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  2 +-
 .../beam/sdk/io/CompressedSourceTest.java       | 44 +++++++++++++++++++-
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a919f8e7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 4baac36..ad81b61 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -146,7 +146,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
       public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
           throws IOException {
         return Channels.newChannel(
-            new BZip2CompressorInputStream(Channels.newInputStream(channel)));
+            new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
       }
     },
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a919f8e7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 3fff319..fa28e4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -253,6 +253,39 @@ public class CompressedSourceTest {
   }
 
   /**
+   * Test a bzip2 file containing multiple streams is correctly decompressed.
+   *
+   * <p>A bzip2 file may contain multiple streams and should decompress as the concatenation of
+   * those streams.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadMultiStreamBzip2() throws IOException {
+    CompressionMode mode = CompressionMode.BZIP2;
+    byte[] input1 = generateInput(5, 587973);
+    byte[] input2 = generateInput(5, 387374);
+
+    ByteArrayOutputStream stream1 = new ByteArrayOutputStream();
+    try (OutputStream os = getOutputStreamForMode(mode, stream1)) {
+      os.write(input1);
+    }
+
+    ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
+    try (OutputStream os = getOutputStreamForMode(mode, stream2)) {
+      os.write(input2);
+    }
+
+    File tmpFile = tmpFolder.newFile();
+    try (OutputStream os = new FileOutputStream(tmpFile)) {
+      os.write(stream1.toByteArray());
+      os.write(stream2.toByteArray());
+    }
+
+    byte[] output = Bytes.concat(input1, input2);
+    verifyReadContents(output, tmpFile, mode);
+  }
+
+  /**
    * Test reading empty input with bzip2.
    */
   @Test
@@ -470,7 +503,16 @@ public class CompressedSourceTest {
    */
   private byte[] generateInput(int size) {
     // Arbitrary but fixed seed
-    Random random = new Random(285930);
+    return generateInput(size, 285930);
+  }
+
+
+    /**
+     * Generate byte array of given size.
+     */
+  private byte[] generateInput(int size, int seed) {
+    // Arbitrary but fixed seed
+    Random random = new Random(seed);
     byte[] buff = new byte[size];
     random.nextBytes(buff);
     return buff;


[2/2] beam git commit: [BEAM-2708] This closes #3669

Posted by jb...@apache.org.
[BEAM-2708] This closes #3669


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9582840b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9582840b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9582840b

Branch: refs/heads/master
Commit: 9582840bc339046605bc32993649c3a0ca6c420d
Parents: 339976c a919f8e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Aug 2 20:47:42 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 2 20:47:42 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  2 +-
 .../beam/sdk/io/CompressedSourceTest.java       | 44 +++++++++++++++++++-
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------