You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:58 UTC
[46/50] [abbrv] beam git commit: Adds TextIO.readAll(),
implemented rather naively
Adds TextIO.readAll(), implemented rather naively
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb06f3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb06f3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb06f3b
Branch: refs/heads/DSL_SQL
Commit: fcb06f3bf5482dc3ae63a3c070592bae0c631c6d
Parents: 2e42ae4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 23 18:02:10 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700
----------------------------------------------------------------------
...ndedSplittableProcessElementInvokerTest.java | 2 +-
.../core/SplittableParDoProcessFnTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../apache/beam/sdk/io/CompressedSource.java | 40 ++--
.../apache/beam/sdk/io/OffsetBasedSource.java | 22 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 230 +++++++++++++++++--
.../apache/beam/sdk/io/range/OffsetRange.java | 101 ++++++++
.../beam/sdk/io/range/OffsetRangeTracker.java | 3 +
.../transforms/splittabledofn/OffsetRange.java | 77 -------
.../splittabledofn/OffsetRangeTracker.java | 1 +
.../java/org/apache/beam/sdk/io/TextIOTest.java | 62 +++--
.../beam/sdk/transforms/SplittableDoFnTest.java | 2 +-
.../splittabledofn/OffsetRangeTrackerTest.java | 1 +
13 files changed, 387 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index a2f6acc..b80a632 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat;
import java.util.Collection;
import java.util.concurrent.Executors;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 9543de8..1cd1275 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 948af1c..43b2788 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -84,6 +84,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -98,7 +99,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 6ab8dec..4baac36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -96,12 +96,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
*/
ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
throws IOException;
-
- /**
- * Given a file name, returns true if the file name matches any supported compression
- * scheme.
- */
- boolean isCompressed(String fileName);
}
/**
@@ -242,6 +236,16 @@ public class CompressedSource<T> extends FileBasedSource<T> {
@Override
public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException;
+
+ /** Returns whether the file's extension matches of one of the known compression formats. */
+ public static boolean isCompressed(String filename) {
+ for (CompressionMode type : CompressionMode.values()) {
+ if (type.matches(filename)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
/**
@@ -273,16 +277,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
ReadableByteChannel.class.getSimpleName(),
ReadableByteChannel.class.getSimpleName()));
}
-
- @Override
- public boolean isCompressed(String fileName) {
- for (CompressionMode type : CompressionMode.values()) {
- if (type.matches(fileName)) {
- return true;
- }
- }
- return false;
- }
}
private final FileBasedSource<T> sourceDelegate;
@@ -366,13 +360,9 @@ public class CompressedSource<T> extends FileBasedSource<T> {
*/
@Override
protected final boolean isSplittable() throws Exception {
- if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
- (FileNameBasedDecompressingChannelFactory) channelFactory;
- return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())
- && sourceDelegate.isSplittable();
- }
- return false;
+ return channelFactory instanceof FileNameBasedDecompressingChannelFactory
+ && !CompressionMode.isCompressed(getFileOrPatternSpec())
+ && sourceDelegate.isSplittable();
}
/**
@@ -386,9 +376,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
@Override
protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
- FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
- (FileNameBasedDecompressingChannelFactory) channelFactory;
- if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) {
+ if (!CompressionMode.isCompressed(getFileOrPatternSpec())) {
return sourceDelegate.createSingleFileReader(options);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 05f0d97..c3687a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
import org.apache.beam.sdk.io.range.RangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -110,8 +111,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
@Override
public List<? extends OffsetBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
- // make sure that we do not end up with a too small bundle at the end. If the desired bundle
+ // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle
// size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
long desiredBundleSizeOffsetUnits = Math.max(
@@ -119,20 +119,10 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
minBundleSize);
List<OffsetBasedSource<T>> subSources = new ArrayList<>();
- long start = startOffset;
- long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
-
- while (start < maxEnd) {
- long end = start + desiredBundleSizeOffsetUnits;
- end = Math.min(end, maxEnd);
- // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
- long remaining = maxEnd - end;
- if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
- end = maxEnd;
- }
- subSources.add(createSourceForSubrange(start, end));
-
- start = end;
+ for (OffsetRange range :
+ new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))
+ .split(desiredBundleSizeOffsetUnits, minBundleSize)) {
+ subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));
}
return subSources;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 5241589..78340f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,25 +23,37 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -51,13 +63,14 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
* instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
- * file(s) to be read.
+ * file(s) to be read. Alternatively, if the filenames to be read are themselves in a
+ * {@link PCollection}, apply {@link TextIO#readAll()}.
*
* <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
* corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
* or '\r\n').
*
- * <p>Example:
+ * <p>Example 1: reading a file or filepattern.
*
* <pre>{@code
* Pipeline p = ...;
@@ -66,6 +79,19 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
+ * <p>Example 2: reading a PCollection of filenames.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // E.g. the filenames might be computed from other data in the pipeline, or
+ * // read from a data source.
+ * PCollection<String> filenames = ...;
+ *
+ * // Read all files in the collection.
+ * PCollection<String> lines = filenames.apply(TextIO.readAll());
+ * }</pre>
+ *
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
@@ -132,6 +158,26 @@ public class TextIO {
}
/**
+ * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link
+ * PCollection} of filepatterns.
+ *
+ * <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is
+ * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every
+ * filepattern is expanded once at the moment it is processed, rather than watched for new files
+ * matching the filepattern to appear. Likewise, every file is read once, rather than watched for
+ * new entries.
+ */
+ public static ReadAll readAll() {
+ return new AutoValue_TextIO_ReadAll.Builder()
+ .setCompressionType(CompressionType.AUTO)
+ // 64MB is a reasonable value that allows to amortize the cost of opening files,
+ // but is not so large as to exhaust a typical runner's maximum amount of output per
+ // ProcessElement call.
+ .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+ .build();
+ }
+
+ /**
* A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
* matching a sharding pattern), with each element of the input collection encoded into its own
* line.
@@ -228,29 +274,34 @@ public class TextIO {
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
- switch (getCompressionType()) {
+ return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType());
+ }
+
+ private static FileBasedSource<String> wrapWithCompression(
+ FileBasedSource<String> source, CompressionType compressionType) {
+ switch (compressionType) {
case UNCOMPRESSED:
- return new TextSource(getFilepattern());
+ return source;
case AUTO:
- return CompressedSource.from(new TextSource(getFilepattern()));
+ return CompressedSource.from(source);
case BZIP2:
return
- CompressedSource.from(new TextSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.BZIP2);
+ CompressedSource.from(source)
+ .withDecompression(CompressionMode.BZIP2);
case GZIP:
return
- CompressedSource.from(new TextSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.GZIP);
+ CompressedSource.from(source)
+ .withDecompression(CompressionMode.GZIP);
case ZIP:
return
- CompressedSource.from(new TextSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.ZIP);
+ CompressedSource.from(source)
+ .withDecompression(CompressionMode.ZIP);
case DEFLATE:
return
- CompressedSource.from(new TextSource(getFilepattern()))
- .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+ CompressedSource.from(source)
+ .withDecompression(CompressionMode.DEFLATE);
default:
- throw new IllegalArgumentException("Unknown compression type: " + getFilepattern());
+ throw new IllegalArgumentException("Unknown compression type: " + compressionType);
}
}
@@ -273,7 +324,156 @@ public class TextIO {
}
}
- // ///////////////////////////////////////////////////////////////////////////
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Implementation of {@link #readAll}. */
+ @AutoValue
+ public abstract static class ReadAll
+ extends PTransform<PCollection<String>, PCollection<String>> {
+ abstract CompressionType getCompressionType();
+ abstract long getDesiredBundleSizeBytes();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+ abstract ReadAll build();
+ }
+
+ /** Same as {@link Read#withCompressionType(CompressionType)}. */
+ public ReadAll withCompressionType(CompressionType compressionType) {
+ return toBuilder().setCompressionType(compressionType).build();
+ }
+
+ @VisibleForTesting
+ ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+ return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<String> input) {
+ return input
+ .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
+ .apply(
+ "Split into ranges",
+ ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes())))
+ .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
+ .apply("Read", ParDo.of(new ReadTextFn(this)));
+ }
+
+ private static class ReshuffleWithUniqueKey<T>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input
+ .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
+ .apply("Reshuffle", Reshuffle.<Integer, T>of())
+ .apply("Values", Values.<T>create());
+ }
+ }
+
+ private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
+ private int index;
+
+ @Setup
+ public void setup() {
+ this.index = ThreadLocalRandom.current().nextInt();
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output(KV.of(++index, c.element()));
+ }
+ }
+
+ private static class ExpandGlobFn extends DoFn<String, Metadata> {
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ MatchResult match = FileSystems.match(c.element());
+ checkArgument(
+ match.status().equals(Status.OK),
+ "Failed to match filepattern %s: %s",
+ c.element(),
+ match.status());
+ for (Metadata metadata : match.metadata()) {
+ c.output(metadata);
+ }
+ }
+ }
+
+ private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
+ private final CompressionType compressionType;
+ private final long desiredBundleSize;
+
+ private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) {
+ this.compressionType = compressionType;
+ this.desiredBundleSize = desiredBundleSize;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ Metadata metadata = c.element();
+ final boolean isSplittable = isSplittable(metadata, compressionType);
+ if (!isSplittable) {
+ c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+ return;
+ }
+ for (OffsetRange range :
+ new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
+ c.output(KV.of(metadata, range));
+ }
+ }
+
+ static boolean isSplittable(Metadata metadata, CompressionType compressionType) {
+ if (!metadata.isReadSeekEfficient()) {
+ return false;
+ }
+ switch (compressionType) {
+ case AUTO:
+ return !CompressionMode.isCompressed(metadata.resourceId().toString());
+ case UNCOMPRESSED:
+ return true;
+ case GZIP:
+ case BZIP2:
+ case ZIP:
+ case DEFLATE:
+ return false;
+ default:
+ throw new UnsupportedOperationException("Unknown compression type: " + compressionType);
+ }
+ }
+ }
+
+ private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> {
+ private final TextIO.ReadAll spec;
+
+ private ReadTextFn(ReadAll spec) {
+ this.spec = spec;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) throws IOException {
+ Metadata metadata = c.element().getKey();
+ OffsetRange range = c.element().getValue();
+ FileBasedSource<String> source =
+ TextIO.Read.wrapWithCompression(
+ new TextSource(StaticValueProvider.of(metadata.toString())),
+ spec.getCompressionType());
+ BoundedSource.BoundedReader<String> reader =
+ source
+ .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
+ .createReader(c.getPipelineOptions());
+ for (boolean more = reader.start(); more; more = reader.advance()) {
+ c.output(reader.getCurrent());
+ }
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
new file mode 100644
index 0000000..d3bff37
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.range;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+
+/** A restriction represented by a range of integers [from, to). */
+public class OffsetRange
+ implements Serializable,
+ HasDefaultTracker<
+ OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> {
+ private final long from;
+ private final long to;
+
+ public OffsetRange(long from, long to) {
+ checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
+ this.from = from;
+ this.to = to;
+ }
+
+ public long getFrom() {
+ return from;
+ }
+
+ public long getTo() {
+ return to;
+ }
+
+ @Override
+ public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() {
+ return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + from + ", " + to + ')';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OffsetRange that = (OffsetRange) o;
+
+ if (from != that.from) {
+ return false;
+ }
+ return to == that.to;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (from ^ (from >>> 32));
+ result = 31 * result + (int) (to ^ (to >>> 32));
+ return result;
+ }
+
+ public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) {
+ List<OffsetRange> res = new ArrayList<>();
+ long start = getFrom();
+ long maxEnd = getTo();
+
+ while (start < maxEnd) {
+ long end = start + desiredNumOffsetsPerSplit;
+ end = Math.min(end, maxEnd);
+ // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit.
+ long remaining = maxEnd - end;
+ if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) {
+ end = maxEnd;
+ }
+ res.add(new OffsetRange(start, end));
+ start = end;
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 51e2b1a..8f0083e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory;
/**
* A {@link RangeTracker} for non-negative positions of type {@code long}.
+ *
+ * <p>Not to be confused with {@link
+ * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}.
*/
public class OffsetRangeTracker implements RangeTracker<Long> {
private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
deleted file mode 100644
index 104f5f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-
-/** A restriction represented by a range of integers [from, to). */
-public class OffsetRange
- implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> {
- private final long from;
- private final long to;
-
- public OffsetRange(long from, long to) {
- checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
- this.from = from;
- this.to = to;
- }
-
- public long getFrom() {
- return from;
- }
-
- public long getTo() {
- return to;
- }
-
- @Override
- public OffsetRangeTracker newTracker() {
- return new OffsetRangeTracker(this);
- }
-
- @Override
- public String toString() {
- return "[" + from + ", " + to + ')';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- OffsetRange that = (OffsetRange) o;
-
- if (from != that.from) {
- return false;
- }
- return to == that.to;
- }
-
- @Override
- public int hashCode() {
- int result = (int) (from ^ (from >>> 32));
- result = 31 * result + (int) (to ^ (to >>> 32));
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 0271a0d..62c10a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8797ff7..a6be4fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -120,10 +120,10 @@ import org.junit.runners.JUnit4;
public class TextIOTest {
private static final String MY_HEADER = "myHeader";
private static final String MY_FOOTER = "myFooter";
- private static final String[] EMPTY = new String[] {};
- private static final String[] TINY =
- new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
- private static final String[] LARGE = makeLines(1000);
+ private static final List<String> EMPTY = Collections.emptyList();
+ private static final List<String> TINY =
+ Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk");
+ private static final List<String> LARGE = makeLines(1000);
private static Path tempFolder;
private static File emptyTxt;
@@ -148,7 +148,7 @@ public class TextIOTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
- private static File writeToFile(String[] lines, String filename, CompressionType compression)
+ private static File writeToFile(List<String> lines, String filename, CompressionType compression)
throws IOException {
File file = tempFolder.resolve(filename).toFile();
OutputStream output = new FileOutputStream(file);
@@ -791,7 +791,7 @@ public class TextIOTest {
* Helper that writes the given lines (adding a newline in between) to a stream, then closes the
* stream.
*/
- private static void writeToStreamAndClose(String[] lines, OutputStream outputStream) {
+ private static void writeToStreamAndClose(List<String> lines, OutputStream outputStream) {
try (PrintStream writer = new PrintStream(outputStream)) {
for (String line : lines) {
writer.println(line);
@@ -800,27 +800,33 @@ public class TextIOTest {
}
/**
- * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType)
+ * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and
+ * TextIO.readAll().withCompressionType(compressionType) applied to the single filename,
* and asserts that the results match the given expected output.
*/
private void assertReadingCompressedFileMatchesExpected(
- File file, CompressionType compressionType, String[] expected) {
-
- TextIO.Read read =
- TextIO.read().from(file.getPath()).withCompressionType(compressionType);
- PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read);
-
- PAssert.that(output).containsInAnyOrder(expected);
+ File file, CompressionType compressionType, List<String> expected) {
+
+ TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType);
+ PAssert.that(p.apply("Read_" + file + "_" + compressionType.toString(), read))
+ .containsInAnyOrder(expected);
+
+ TextIO.ReadAll readAll =
+ TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10);
+ PAssert.that(
+ p.apply("Create_" + file, Create.of(file.getPath()))
+ .apply("Read_" + compressionType.toString(), readAll))
+ .containsInAnyOrder(expected);
p.run();
}
/**
* Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n).
*/
- private static String[] makeLines(int n) {
- String[] ret = new String[n];
+ private static List<String> makeLines(int n) {
+ List<String> ret = new ArrayList<>();
for (int i = 0; i < n; ++i) {
- ret[i] = "word" + i;
+ ret.add("word" + i);
}
return ret;
}
@@ -1004,7 +1010,7 @@ public class TextIOTest {
String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2);
assertReadingCompressedFileMatchesExpected(
- new File(filename), CompressionType.ZIP, expected.toArray(new String[]{}));
+ new File(filename), CompressionType.ZIP, expected);
}
/**
@@ -1023,7 +1029,7 @@ public class TextIOTest {
new String[]{"dog"});
assertReadingCompressedFileMatchesExpected(
- new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"});
+ new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog"));
}
@Test
@@ -1340,5 +1346,21 @@ public class TextIOTest {
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
}
-}
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadAll() throws IOException {
+ writeToFile(TINY, "readAllTiny1.zip", ZIP);
+ writeToFile(TINY, "readAllTiny2.zip", ZIP);
+ writeToFile(LARGE, "readAllLarge1.zip", ZIP);
+ writeToFile(LARGE, "readAllLarge2.zip", ZIP);
+ PCollection<String> lines =
+ p.apply(
+ Create.of(
+ tempFolder.resolve("readAllTiny*").toString(),
+ tempFolder.resolve("readAllLarge*").toString()))
+ .apply(TextIO.readAll().withCompressionType(AUTO));
+ PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE));
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 0c2bd1c..cb60f9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.PAssert;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 831894c..8aed6b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.sdk.io.range.OffsetRange;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;