You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/04 23:40:59 UTC
[2/3] incubator-beam git commit: [BEAM-167] Fix custom source gzip
input to read concatenated gzip files
[BEAM-167] Fix custom source gzip input to read concatenated gzip files
This applies patch from kirpichov@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/abc397f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/abc397f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/abc397f9
Branch: refs/heads/master
Commit: abc397f9b9851599eff7f3e1ec5b5343005a0a94
Parents: fd049b5
Author: kirpichov <ki...@google.com>
Authored: Mon Apr 4 13:31:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Apr 4 14:40:07 2016 -0700
----------------------------------------------------------------------
.../cloud/dataflow/sdk/io/CompressedSource.java | 2 +-
.../dataflow/sdk/io/CompressedSourceTest.java | 40 ++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
index 4e3e9ca..15e6e29 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
@@ -122,7 +122,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
- return Channels.newChannel(new GzipCompressorInputStream(stream));
+ return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 2dcddb4..f63a128 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -47,16 +47,19 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
@@ -98,6 +101,43 @@ public class CompressedSourceTest {
runReadTest(input, CompressionMode.GZIP);
}
+ private static byte[] compressGzip(byte[] input) throws IOException {
+ ByteArrayOutputStream res = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
+ gzipStream.write(input);
+ }
+ return res.toByteArray();
+ }
+
+ private static byte[] concat(byte[] first, byte[] second) {
+ byte[] res = new byte[first.length + second.length];
+ System.arraycopy(first, 0, res, 0, first.length);
+ System.arraycopy(second, 0, res, first.length, second.length);
+ return res;
+ }
+
+ @Test
+ public void testReadConcatenatedGzip() throws IOException {
+ byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
+ byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
+ byte[] expected = concat(header, body);
+ byte[] totalGz = concat(compressGzip(header), compressGzip(body));
+ File tmpFile = tmpFolder.newFile();
+ try (FileOutputStream os = new FileOutputStream(tmpFile)) {
+ os.write(totalGz);
+ }
+
+ Pipeline p = TestPipeline.create();
+
+ CompressedSource<Byte> source =
+ CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
+ .withDecompression(CompressionMode.GZIP);
+ PCollection<Byte> output = p.apply(Read.from(source));
+
+ DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+ p.run();
+ }
+
/**
* Test reading empty input with bzip2.
*/