You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/23 07:49:00 UTC

[1/2] incubator-beam git commit: Closes #400

Repository: incubator-beam
Updated Branches:
  refs/heads/master 748b0c8da -> f2d2ce5f4


Closes #400


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f2d2ce5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f2d2ce5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f2d2ce5f

Branch: refs/heads/master
Commit: f2d2ce5f49be2c484efd106500149e88547604bd
Parents: 748b0c8 49b4847
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 23 00:48:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 00:48:39 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  64 +++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     |  11 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 173 +++++++++++++++++++
 3 files changed, 247 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-314] Add zip compression support in TextIO

Posted by dh...@apache.org.
[BEAM-314] Add zip compression support in TextIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49b48472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49b48472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49b48472

Branch: refs/heads/master
Commit: 49b484725afe378f2f042a4da003dbf25debc198
Parents: 748b0c8
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue May 31 14:23:27 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 23 00:48:39 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    |  64 +++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     |  11 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 173 +++++++++++++++++++
 3 files changed, 247 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 75bfc8f..48e0b7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -30,6 +30,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PushbackInputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -37,6 +38,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.NoSuchElementException;
 import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -149,9 +152,70 @@ public class CompressedSource<T> extends FileBasedSource<T> {
         return Channels.newChannel(
             new BZip2CompressorInputStream(Channels.newInputStream(channel)));
       }
+    },
+
+    /**
+     * Reads a byte channel assuming it is compressed with zip.
+     * If the zip file contains multiple entries, files in the zip are concatenated all together.
+     */
+    ZIP {
+      @Override
+      public boolean matches(String fileName) {
+        return fileName.toLowerCase().endsWith(".zip");
+      }
+
+      public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
+        throws IOException {
+        FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
+        return Channels.newChannel(zip);
+      }
     };
 
     /**
+     * Extend of {@link ZipInputStream} to automatically read all entries in the zip.
+     */
+    private static class FullZipInputStream extends InputStream {
+
+      private ZipInputStream zipInputStream;
+      private ZipEntry currentEntry;
+
+      public FullZipInputStream(InputStream is) throws IOException {
+        super();
+        zipInputStream = new ZipInputStream(is);
+        currentEntry = zipInputStream.getNextEntry();
+      }
+
+      @Override
+      public int read() throws IOException {
+        int result = zipInputStream.read();
+        while (result == -1) {
+          currentEntry = zipInputStream.getNextEntry();
+          if (currentEntry == null) {
+            return -1;
+          } else {
+            result = zipInputStream.read();
+          }
+        }
+        return result;
+      }
+
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException {
+        int result = zipInputStream.read(b, off, len);
+        while (result == -1) {
+          currentEntry = zipInputStream.getNextEntry();
+          if (currentEntry == null) {
+            return -1;
+          } else {
+            result = zipInputStream.read(b, off, len);
+          }
+        }
+        return result;
+      }
+
+    }
+
+    /**
      * Returns {@code true} if the given file name implies that the contents are compressed
      * according to the compression embodied by this factory.
      */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dc50a8c..a7e5e29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -328,6 +328,11 @@ public class TextIO {
                 CompressedSource.from(new TextSource<T>(filepattern, coder))
                                 .withDecompression(CompressedSource.CompressionMode.GZIP));
             break;
+          case ZIP:
+            read = org.apache.beam.sdk.io.Read.from(
+                CompressedSource.from(new TextSource<T>(filepattern, coder))
+                                .withDecompression(CompressedSource.CompressionMode.ZIP));
+            break;
           default:
             throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
         }
@@ -721,7 +726,11 @@ public class TextIO {
     /**
      * BZipped.
      */
-    BZIP2(".bz2");
+    BZIP2(".bz2"),
+    /**
+     * Zipped.
+     */
+    ZIP(".zip");
 
     private String filenameSuffix;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49b48472/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 9a762d1..c3a5084 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -81,6 +81,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.annotation.Nullable;
 
 /**
  * Tests for TextIO Read and Write transforms.
@@ -419,6 +423,175 @@ public class TextIOTest {
     p.run();
   }
 
+  /**
+   * Create a zip file with the given lines.
+   *
+   * @param expected A list of expected lines, populated in the zip file.
+   * @param filename Optionally zip file name (can be null).
+   * @param fieldsEntries Fields to write in zip entries.
+   * @return The zip filename.
+   * @throws Exception In case of a failure during zip file creation.
+   */
+  private String createZipFile(List<String> expected, @Nullable String filename, String[]
+      ...
+      fieldsEntries)
+      throws Exception {
+    File tmpFile;
+    if (filename != null) {
+      tmpFile = tmpFolder.newFile(filename);
+    } else {
+      tmpFile = tmpFolder.newFile();
+    }
+    String tmpFileName = tmpFile.getPath();
+
+    ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
+    PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
+
+    int index = 0;
+    for (String[] entry : fieldsEntries) {
+      out.putNextEntry(new ZipEntry(Integer.toString(index)));
+      for (String field : entry) {
+        writer.println(field);
+        expected.add(field);
+      }
+      out.closeEntry();
+      index++;
+    }
+
+    writer.close();
+    out.close();
+
+    return tmpFileName;
+  }
+
+  /**
+   * Read a zip compressed file. The user provides the ZIP compression type.
+   * We expect a PCollection with the lines.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedRead() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    List<String> expected = new ArrayList<>();
+
+    String filename = createZipFile(expected, null, lines);
+
+    Pipeline p = TestPipeline.create();
+
+    TextIO.Read.Bound<String> read =
+        TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
+    PCollection<String> output = p.apply(read);
+
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  /**
+   * Read a zip compressed file. The ZIP compression type is auto-detected based on the
+   * file extension. We expect a PCollection with the lines.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedAutoDetected() throws Exception {
+    String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
+    List<String> expected = new ArrayList<>();
+
+    String filename = createZipFile(expected, "testZipCompressedAutoDetected.zip", lines);
+
+    // test with auto-detect ZIP based on extension.
+    Pipeline p = TestPipeline.create();
+
+    TextIO.Read.Bound<String> read = TextIO.Read.from(filename);
+    PCollection<String> output = p.apply(read);
+
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  /**
+   * Read a ZIP compressed empty file. We expect an empty PCollection.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedReadWithEmptyFile() throws Exception {
+    String filename = createZipFile(new ArrayList<String>(), null);
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
+        (CompressionType.ZIP));
+    PAssert.that(output).empty();
+
+    p.run();
+  }
+
+  /**
+   * Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedReadWithEmptyEntry() throws Exception {
+    String filename = createZipFile(new ArrayList<String>(), null, new String[]{ });
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
+        (CompressionType.ZIP));
+    PAssert.that(output).empty();
+
+    p.run();
+  }
+
+  /**
+   * Read a ZIP compressed file with multiple entries. We expect a PCollection containing
+   * lines from all entries.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
+    String[] entry0 = new String[]{ "first", "second", "three" };
+    String[] entry1 = new String[]{ "four", "five", "six" };
+    String[] entry2 = new String[]{ "seven", "eight", "nine" };
+
+    List<String> expected = new ArrayList<>();
+
+    String filename = createZipFile(expected, null, entry0, entry1, entry2);
+
+    Pipeline p = TestPipeline.create();
+
+    TextIO.Read.Bound<String> read =
+        TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
+    PCollection<String> output = p.apply(read);
+
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  /**
+   * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We
+   * expect just the data back.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception {
+    String filename = createZipFile(
+        new ArrayList<String>(),
+        null,
+        new String[] {"cat"},
+        new String[] {},
+        new String[] {},
+        new String[] {"dog"});
+    List<String> expected = ImmutableList.of("cat", "dog");
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> output =
+        p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP));
+    PAssert.that(output).containsInAnyOrder(expected);
+
+    p.run();
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testGZIPReadWhenUncompressed() throws Exception {