You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/21 18:10:02 UTC
[3/3] incubator-beam git commit: TextIO/CompressedSource: split AUTO
mode files into bundles
TextIO/CompressedSource: split AUTO mode files into bundles
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370d5924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370d5924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370d5924
Branch: refs/heads/master
Commit: 370d5924f393346115a22c23e5487f094847a783
Parents: 1ceb12a
Author: Dan Halperin <dh...@google.com>
Authored: Mon Sep 19 22:46:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Sep 21 11:09:50 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/CompressedSource.java | 10 +--
.../java/org/apache/beam/sdk/io/TextIO.java | 44 +++++-----
.../java/org/apache/beam/sdk/io/TextIOTest.java | 85 +++++++++++++++++++-
3 files changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 3cd097c..8a5fedd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -314,11 +314,11 @@ public class CompressedSource<T> extends FileBasedSource<T> {
DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
long startOffset, long endOffset) {
super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
- checkArgument(
- startOffset == 0,
- "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
+ checkArgument(
+ isSplittable() || startOffset == 0,
+ "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
}
/**
@@ -339,7 +339,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
@Override
protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
- channelFactory, fileName, Long.MAX_VALUE, start, end);
+ channelFactory, fileName, sourceDelegate.getMinBundleSize(), start, end);
}
/**
@@ -348,7 +348,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
* from the requested file name that the file is not compressed.
*/
@Override
- protected final boolean isSplittable() throws Exception {
+ protected final boolean isSplittable() {
if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
(FileNameBasedDecompressingChannelFactory) channelFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 79967d1..62d3ae8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
@@ -286,40 +287,35 @@ public class TextIO {
}
}
- // Create a source specific to the requested compression type.
- final Bounded<T> read;
- switch(compressionType) {
+ final Bounded<T> read = org.apache.beam.sdk.io.Read.from(getSource());
+ PCollection<T> pcol = input.getPipeline().apply("Read", read);
+ // Honor the default output coder that would have been used by this PTransform.
+ pcol.setCoder(getDefaultOutputCoder());
+ return pcol;
+ }
+
+ // Helper to create a source specific to the requested compression type.
+ protected FileBasedSource<T> getSource() {
+ switch (compressionType) {
case UNCOMPRESSED:
- read = org.apache.beam.sdk.io.Read.from(
- new TextSource<T>(filepattern, coder));
- break;
+ return new TextSource<T>(filepattern, coder);
case AUTO:
- read = org.apache.beam.sdk.io.Read.from(
- CompressedSource.from(new TextSource<T>(filepattern, coder)));
- break;
+ return CompressedSource.from(new TextSource<T>(filepattern, coder));
case BZIP2:
- read = org.apache.beam.sdk.io.Read.from(
+ return
CompressedSource.from(new TextSource<T>(filepattern, coder))
- .withDecompression(CompressedSource.CompressionMode.BZIP2));
- break;
+ .withDecompression(CompressedSource.CompressionMode.BZIP2);
case GZIP:
- read = org.apache.beam.sdk.io.Read.from(
+ return
CompressedSource.from(new TextSource<T>(filepattern, coder))
- .withDecompression(CompressedSource.CompressionMode.GZIP));
- break;
+ .withDecompression(CompressedSource.CompressionMode.GZIP);
case ZIP:
- read = org.apache.beam.sdk.io.Read.from(
+ return
CompressedSource.from(new TextSource<T>(filepattern, coder))
- .withDecompression(CompressedSource.CompressionMode.ZIP));
- break;
+ .withDecompression(CompressedSource.CompressionMode.ZIP);
default:
- throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
+ throw new IllegalArgumentException("Unknown compression type: " + compressionType);
}
-
- PCollection<T> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 859602a..49f5b16 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -24,8 +24,10 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -1118,10 +1120,89 @@ public class TextIOTest {
private TextSource<String> prepareSource(byte[] data) throws IOException {
File file = tmpFolder.newFile();
Files.write(file.toPath(), data);
+ return new TextSource<>(file.toPath().toString(), StringUtf8Coder.of());
+ }
+
+ @Test
+ public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception {
+ String[] lines = makeLines(5000);
+ File file = tmpFolder.newFile("to_be_split_auto.txt");
+ writeToStreamAndClose(lines, new FileOutputStream(file));
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ long desiredBundleSize = 1000;
+
+ // Sanity check: file is at least 2 bundles long.
+ assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+ FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+ List<? extends FileBasedSource<String>> splits =
+ source.splitIntoBundles(desiredBundleSize, options);
+
+ // At least 2 splits and they are equal to reading the whole file.
+ assertThat(splits, hasSize(greaterThan(1)));
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ }
+
+ @Test
+ public void testInitialSplitIntoBundlesAutoModeGz() throws Exception {
+ String[] lines = makeLines(5000);
+ File file = tmpFolder.newFile("to_be_split_auto.gz");
+ writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ long desiredBundleSize = 1000;
+
+ // Sanity check: file is at least 2 bundles long.
+ assertThat(file.length(), greaterThan(2 * desiredBundleSize));
- TextSource<String> source = new TextSource<>(file.toPath().toString(), StringUtf8Coder.of());
+ FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource();
+ List<? extends FileBasedSource<String>> splits =
+ source.splitIntoBundles(desiredBundleSize, options);
- return source;
+ // Exactly 1 split, even in AUTO mode, since it is a gzip file.
+ assertThat(splits, hasSize(equalTo(1)));
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ }
+
+ @Test
+ public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception {
+ String[] lines = makeLines(5000);
+ File file = tmpFolder.newFile("to_be_split_gzip.txt");
+ writeToStreamAndClose(lines, new FileOutputStream(file));
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ long desiredBundleSize = 1000;
+
+ // Sanity check: file is at least 2 bundles long.
+ assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+ FileBasedSource<String> source =
+ TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+ List<? extends FileBasedSource<String>> splits =
+ source.splitIntoBundles(desiredBundleSize, options);
+
+ // Exactly 1 split, even though .txt extension, since using GZIP mode.
+ assertThat(splits, hasSize(equalTo(1)));
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ }
+
+ @Test
+ public void testInitialSplitIntoBundlesGzipModeGz() throws Exception {
+ String[] lines = makeLines(5000);
+ File file = tmpFolder.newFile("to_be_split_gzip.gz");
+ writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file)));
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ long desiredBundleSize = 1000;
+
+ // Sanity check: file is at least 2 bundles long.
+ assertThat(file.length(), greaterThan(2 * desiredBundleSize));
+
+ FileBasedSource<String> source =
+ TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource();
+ List<? extends FileBasedSource<String>> splits =
+ source.splitIntoBundles(desiredBundleSize, options);
+
+ // Exactly 1 split using .gz extension and using GZIP mode.
+ assertThat(splits, hasSize(equalTo(1)));
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
}
///////////////////////////////////////////////////////////////////////////////////////////////