You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/21 18:10:02 UTC

[3/3] incubator-beam git commit: TextIO/CompressedSource: split AUTO mode files into bundles

TextIO/CompressedSource: split AUTO mode files into bundles


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370d5924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370d5924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370d5924

Branch: refs/heads/master
Commit: 370d5924f393346115a22c23e5487f094847a783
Parents: 1ceb12a
Author: Dan Halperin <dh...@google.com>
Authored: Mon Sep 19 22:46:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Sep 21 11:09:50 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/CompressedSource.java    | 10 +--
 .../java/org/apache/beam/sdk/io/TextIO.java     | 44 +++++-----
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 85 +++++++++++++++++++-
 3 files changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 3cd097c..8a5fedd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -314,11 +314,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
       DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
       long startOffset, long endOffset) {
     super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
-    checkArgument(
-        startOffset == 0,
-        "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
     this.sourceDelegate = sourceDelegate;
     this.channelFactory = channelFactory;
+    checkArgument(
+        isSplittable() || startOffset == 0,
+        "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
   }
 
   /**
@@ -339,7 +339,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   @Override
   protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
     return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
-        channelFactory, fileName, Long.MAX_VALUE, start, end);
+        channelFactory, fileName, sourceDelegate.getMinBundleSize(), start, end);
   }
 
   /**
@@ -348,7 +348,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    * from the requested file name that the file is not compressed.
    */
   @Override
-  protected final boolean isSplittable() throws Exception {
+  protected final boolean isSplittable() {
     if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
       FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
           (FileNameBasedDecompressingChannelFactory) channelFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 79967d1..62d3ae8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
@@ -286,40 +287,35 @@ public class TextIO {
           }
         }
 
-        // Create a source specific to the requested compression type.
-        final Bounded<T> read;
-        switch(compressionType) {
+        final Bounded<T> read = org.apache.beam.sdk.io.Read.from(getSource());
+        PCollection<T> pcol = input.getPipeline().apply("Read", read);
+        // Honor the default output coder that would have been used by this PTransform.
+        pcol.setCoder(getDefaultOutputCoder());
+        return pcol;
+      }
+
+      // Helper to create a source specific to the requested compression type.
+      protected FileBasedSource<T> getSource() {
+        switch (compressionType) {
           case UNCOMPRESSED:
-            read = org.apache.beam.sdk.io.Read.from(
-                new TextSource<T>(filepattern, coder));
-            break;
+            return new TextSource<T>(filepattern, coder);
           case AUTO:
-            read = org.apache.beam.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder)));
-            break;
+            return CompressedSource.from(new TextSource<T>(filepattern, coder));
           case BZIP2:
-            read = org.apache.beam.sdk.io.Read.from(
+            return
                 CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.BZIP2));
-            break;
+                    .withDecompression(CompressedSource.CompressionMode.BZIP2);
           case GZIP:
-            read = org.apache.beam.sdk.io.Read.from(
+            return
                 CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.GZIP));
-            break;
+                    .withDecompression(CompressedSource.CompressionMode.GZIP);
           case ZIP:
-            read = org.apache.beam.sdk.io.Read.from(
+            return
                 CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.ZIP));
-            break;
+                    .withDecompression(CompressedSource.CompressionMode.ZIP);
           default:
-            throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
+            throw new IllegalArgumentException("Unknown compression type: " + compressionType);
         }
-
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 859602a..49f5b16 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -24,8 +24,10 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1118,10 +1120,89 @@ public class TextIOTest {
   private TextSource<String> prepareSource(byte[] data) throws IOException {
     File file = tmpFolder.newFile();
     Files.write(file.toPath(), data);
+    return new TextSource<>(file.toPath().toString(), StringUtf8Coder.of());
+  }
+
+  @Test
+  public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception {
+    String[] lines = makeLines(5000);
+    File file = tmpFolder.newFile("to_be_split_auto.txt");
+    writeToStreamAndClose(lines, new FileOutputStream(file));
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    long desiredBundleSize = 1000;
+
+    // Sanity check: file is at least 2 bundles long.
+    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+    FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+    List<? extends FileBasedSource<String>> splits =
+        source.splitIntoBundles(desiredBundleSize, options);
+
+    // At least 2 splits and they are equal to reading the whole file.
+    assertThat(splits, hasSize(greaterThan(1)));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+  }
+
+  @Test
+  public void testInitialSplitIntoBundlesAutoModeGz() throws Exception {
+    String[] lines = makeLines(5000);
+    File file = tmpFolder.newFile("to_be_split_auto.gz");
+    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    long desiredBundleSize = 1000;
+
+    // Sanity check: file is at least 2 bundles long.
+    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
 
-    TextSource<String> source = new TextSource<>(file.toPath().toString(), StringUtf8Coder.of());
+    FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+    List<? extends FileBasedSource<String>> splits =
+        source.splitIntoBundles(desiredBundleSize, options);
 
-    return source;
+    // Exactly 1 split, even in AUTO mode, since it is a gzip file.
+    assertThat(splits, hasSize(equalTo(1)));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+  }
+
+  @Test
+  public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception {
+    String[] lines = makeLines(5000);
+    File file = tmpFolder.newFile("to_be_split_gzip.txt");
+    writeToStreamAndClose(lines, new FileOutputStream(file));
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    long desiredBundleSize = 1000;
+
+    // Sanity check: file is at least 2 bundles long.
+    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+    FileBasedSource<String> source =
+        TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+    List<? extends FileBasedSource<String>> splits =
+        source.splitIntoBundles(desiredBundleSize, options);
+
+    // Exactly 1 split, even though .txt extension, since using GZIP mode.
+    assertThat(splits, hasSize(equalTo(1)));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+  }
+
+  @Test
+  public void testInitialSplitIntoBundlesGzipModeGz() throws Exception {
+    String[] lines = makeLines(5000);
+    File file = tmpFolder.newFile("to_be_split_gzip.gz");
+    writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    long desiredBundleSize = 1000;
+
+    // Sanity check: file is at least 2 bundles long.
+    assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+    FileBasedSource<String> source =
+        TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+    List<? extends FileBasedSource<String>> splits =
+        source.splitIntoBundles(desiredBundleSize, options);
+
+    // Exactly 1 split using .gz extension and using GZIP mode.
+    assertThat(splits, hasSize(equalTo(1)));
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
 
   ///////////////////////////////////////////////////////////////////////////////////////////////