You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/23 07:49:00 UTC
[1/2] incubator-beam git commit: Closes #400
Repository: incubator-beam
Updated Branches:
refs/heads/master 748b0c8da -> f2d2ce5f4
Closes #400
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f2d2ce5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f2d2ce5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f2d2ce5f
Branch: refs/heads/master
Commit: f2d2ce5f49be2c484efd106500149e88547604bd
Parents: 748b0c8 49b4847
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 23 00:48:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 00:48:39 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 64 +++++++
.../java/org/apache/beam/sdk/io/TextIO.java | 11 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 173 +++++++++++++++++++
3 files changed, 247 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-314] Add zip compression
support in TextIO
Posted by dh...@apache.org.
[BEAM-314] Add zip compression support in TextIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49b48472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49b48472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49b48472
Branch: refs/heads/master
Commit: 49b484725afe378f2f042a4da003dbf25debc198
Parents: 748b0c8
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue May 31 14:23:27 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 00:48:39 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 64 +++++++
.../java/org/apache/beam/sdk/io/TextIO.java | 11 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 173 +++++++++++++++++++
3 files changed, 247 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 75bfc8f..48e0b7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -30,6 +30,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PushbackInputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -37,6 +38,8 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import javax.annotation.concurrent.GuardedBy;
@@ -149,9 +152,70 @@ public class CompressedSource<T> extends FileBasedSource<T> {
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}
+ },
+
+ /**
+ * Reads a byte channel assuming it is compressed with zip.
+ * If the zip file contains multiple entries, files in the zip are concatenated all together.
+ */
+ ZIP {
+ @Override
+ public boolean matches(String fileName) {
+ return fileName.toLowerCase().endsWith(".zip");
+ }
+
+ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
+ throws IOException {
+ FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
+ return Channels.newChannel(zip);
+ }
};
/**
+ * Extend of {@link ZipInputStream} to automatically read all entries in the zip.
+ */
+ private static class FullZipInputStream extends InputStream {
+
+ private ZipInputStream zipInputStream;
+ private ZipEntry currentEntry;
+
+ public FullZipInputStream(InputStream is) throws IOException {
+ super();
+ zipInputStream = new ZipInputStream(is);
+ currentEntry = zipInputStream.getNextEntry();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = zipInputStream.read();
+ while (result == -1) {
+ currentEntry = zipInputStream.getNextEntry();
+ if (currentEntry == null) {
+ return -1;
+ } else {
+ result = zipInputStream.read();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = zipInputStream.read(b, off, len);
+ while (result == -1) {
+ currentEntry = zipInputStream.getNextEntry();
+ if (currentEntry == null) {
+ return -1;
+ } else {
+ result = zipInputStream.read(b, off, len);
+ }
+ }
+ return result;
+ }
+
+ }
+
+ /**
* Returns {@code true} if the given file name implies that the contents are compressed
* according to the compression embodied by this factory.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dc50a8c..a7e5e29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -328,6 +328,11 @@ public class TextIO {
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.GZIP));
break;
+ case ZIP:
+ read = org.apache.beam.sdk.io.Read.from(
+ CompressedSource.from(new TextSource<T>(filepattern, coder))
+ .withDecompression(CompressedSource.CompressionMode.ZIP));
+ break;
default:
throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
}
@@ -721,7 +726,11 @@ public class TextIO {
/**
* BZipped.
*/
- BZIP2(".bz2");
+ BZIP2(".bz2"),
+ /**
+ * Zipped.
+ */
+ ZIP(".zip");
private String filenameSuffix;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 9a762d1..c3a5084 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -81,6 +81,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nullable;
/**
* Tests for TextIO Read and Write transforms.
@@ -419,6 +423,175 @@ public class TextIOTest {
p.run();
}
+ /**
+ * Create a zip file with the given lines.
+ *
+ * @param expected A list of expected lines, populated in the zip file.
+ * @param filename Optionally zip file name (can be null).
+ * @param fieldsEntries Fields to write in zip entries.
+ * @return The zip filename.
+ * @throws Exception In case of a failure during zip file creation.
+ */
+ private String createZipFile(List<String> expected, @Nullable String filename, String[]
+ ...
+ fieldsEntries)
+ throws Exception {
+ File tmpFile;
+ if (filename != null) {
+ tmpFile = tmpFolder.newFile(filename);
+ } else {
+ tmpFile = tmpFolder.newFile();
+ }
+ String tmpFileName = tmpFile.getPath();
+
+ ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
+ PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
+
+ int index = 0;
+ for (String[] entry : fieldsEntries) {
+ out.putNextEntry(new ZipEntry(Integer.toString(index)));
+ for (String field : entry) {
+ writer.println(field);
+ expected.add(field);
+ }
+ out.closeEntry();
+ index++;
+ }
+
+ writer.close();
+ out.close();
+
+ return tmpFileName;
+ }
+
+ /**
+ * Read a zip compressed file. The user provides the ZIP compression type.
+ * We expect a PCollection with the lines.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedRead() throws Exception {
+ String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+ List<String> expected = new ArrayList<>();
+
+ String filename = createZipFile(expected, null, lines);
+
+ Pipeline p = TestPipeline.create();
+
+ TextIO.Read.Bound<String> read =
+ TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
+ PCollection<String> output = p.apply(read);
+
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ /**
+ * Read a zip compressed file. The ZIP compression type is auto-detected based on the
+ * file extension. We expect a PCollection with the lines.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedAutoDetected() throws Exception {
+ String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+ List<String> expected = new ArrayList<>();
+
+ String filename = createZipFile(expected, "testZipCompressedAutoDetected.zip", lines);
+
+ // test with auto-detect ZIP based on extension.
+ Pipeline p = TestPipeline.create();
+
+ TextIO.Read.Bound<String> read = TextIO.Read.from(filename);
+ PCollection<String> output = p.apply(read);
+
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ /**
+ * Read a ZIP compressed empty file. We expect an empty PCollection.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedReadWithEmptyFile() throws Exception {
+ String filename = createZipFile(new ArrayList<String>(), null);
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
+ (CompressionType.ZIP));
+ PAssert.that(output).empty();
+
+ p.run();
+ }
+
+ /**
+ * Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedReadWithEmptyEntry() throws Exception {
+ String filename = createZipFile(new ArrayList<String>(), null, new String[]{ });
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
+ (CompressionType.ZIP));
+ PAssert.that(output).empty();
+
+ p.run();
+ }
+
+ /**
+ * Read a ZIP compressed file with multiple entries. We expect a PCollection containing
+ * lines from all entries.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
+ String[] entry0 = new String[]{ "first", "second", "three" };
+ String[] entry1 = new String[]{ "four", "five", "six" };
+ String[] entry2 = new String[]{ "seven", "eight", "nine" };
+
+ List<String> expected = new ArrayList<>();
+
+ String filename = createZipFile(expected, null, entry0, entry1, entry2);
+
+ Pipeline p = TestPipeline.create();
+
+ TextIO.Read.Bound<String> read =
+ TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
+ PCollection<String> output = p.apply(read);
+
+ PAssert.that(output).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ /**
+ * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We
+ * expect just the data back.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception {
+ String filename = createZipFile(
+ new ArrayList<String>(),
+ null,
+ new String[] {"cat"},
+ new String[] {},
+ new String[] {},
+ new String[] {"dog"});
+ List<String> expected = ImmutableList.of("cat", "dog");
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<String> output =
+ p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP));
+ PAssert.that(output).containsInAnyOrder(expected);
+
+ p.run();
+ }
+
@Test
@Category(NeedsRunner.class)
public void testGZIPReadWhenUncompressed() throws Exception {