You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/08/02 18:47:54 UTC
[1/2] beam git commit: [BEAM-2708] Configure BZIP2 to read all
"streams"
Repository: beam
Updated Branches:
refs/heads/master 339976c9f -> 9582840bc
[BEAM-2708] Configure BZIP2 to read all "streams"
Without this, CompressionMode.BZIP2 only supports "standard" bz2 files
containing a single stream. With this change, BZIP2 also supports bz2 files
containing multiple streams, such as those produced by pbzip2.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a919f8e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a919f8e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a919f8e7
Branch: refs/heads/master
Commit: a919f8e7769cb4e10e380553ee9ce7feb6ab369f
Parents: 339976c
Author: bchambers <bc...@google.com>
Authored: Tue Aug 1 14:11:24 2017 -0700
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 2 20:47:11 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 2 +-
.../beam/sdk/io/CompressedSourceTest.java | 44 +++++++++++++++++++-
2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a919f8e7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 4baac36..ad81b61 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -146,7 +146,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
return Channels.newChannel(
- new BZip2CompressorInputStream(Channels.newInputStream(channel)));
+ new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
}
},
http://git-wip-us.apache.org/repos/asf/beam/blob/a919f8e7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 3fff319..fa28e4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -253,6 +253,39 @@ public class CompressedSourceTest {
}
/**
+ * Test a bzip2 file containing multiple streams is correctly decompressed.
+ *
+ * <p>A bzip2 file may contain multiple streams and should decompress as the concatenation of
+ * those streams.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadMultiStreamBzip2() throws IOException {
+ CompressionMode mode = CompressionMode.BZIP2;
+ byte[] input1 = generateInput(5, 587973);
+ byte[] input2 = generateInput(5, 387374);
+
+ ByteArrayOutputStream stream1 = new ByteArrayOutputStream();
+ try (OutputStream os = getOutputStreamForMode(mode, stream1)) {
+ os.write(input1);
+ }
+
+ ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
+ try (OutputStream os = getOutputStreamForMode(mode, stream2)) {
+ os.write(input2);
+ }
+
+ File tmpFile = tmpFolder.newFile();
+ try (OutputStream os = new FileOutputStream(tmpFile)) {
+ os.write(stream1.toByteArray());
+ os.write(stream2.toByteArray());
+ }
+
+ byte[] output = Bytes.concat(input1, input2);
+ verifyReadContents(output, tmpFile, mode);
+ }
+
+ /**
* Test reading empty input with bzip2.
*/
@Test
@@ -470,7 +503,16 @@ public class CompressedSourceTest {
*/
private byte[] generateInput(int size) {
// Arbitrary but fixed seed
- Random random = new Random(285930);
+ return generateInput(size, 285930);
+ }
+
+
+ /**
+ * Generate byte array of given size.
+ */
+ private byte[] generateInput(int size, int seed) {
+ // Arbitrary but fixed seed
+ Random random = new Random(seed);
byte[] buff = new byte[size];
random.nextBytes(buff);
return buff;
[2/2] beam git commit: [BEAM-2708] This closes #3669
Posted by jb...@apache.org.
[BEAM-2708] This closes #3669
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9582840b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9582840b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9582840b
Branch: refs/heads/master
Commit: 9582840bc339046605bc32993649c3a0ca6c420d
Parents: 339976c a919f8e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Aug 2 20:47:42 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed Aug 2 20:47:42 2017 +0200
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 2 +-
.../beam/sdk/io/CompressedSourceTest.java | 44 +++++++++++++++++++-
2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------