You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:58 UTC

[46/50] [abbrv] beam git commit: Adds TextIO.readAll(), implemented rather naively

Adds TextIO.readAll(), implemented rather naively


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb06f3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb06f3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb06f3b

Branch: refs/heads/DSL_SQL
Commit: fcb06f3bf5482dc3ae63a3c070592bae0c631c6d
Parents: 2e42ae4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 23 18:02:10 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700

----------------------------------------------------------------------
 ...ndedSplittableProcessElementInvokerTest.java |   2 +-
 .../core/SplittableParDoProcessFnTest.java      |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../apache/beam/sdk/io/CompressedSource.java    |  40 ++--
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  22 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 230 +++++++++++++++++--
 .../apache/beam/sdk/io/range/OffsetRange.java   | 101 ++++++++
 .../beam/sdk/io/range/OffsetRangeTracker.java   |   3 +
 .../transforms/splittabledofn/OffsetRange.java  |  77 -------
 .../splittabledofn/OffsetRangeTracker.java      |   1 +
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  62 +++--
 .../beam/sdk/transforms/SplittableDoFnTest.java |   2 +-
 .../splittabledofn/OffsetRangeTrackerTest.java  |   1 +
 13 files changed, 387 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index a2f6acc..b80a632 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat;
 
 import java.util.Collection;
 import java.util.concurrent.Executors;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 9543de8..1cd1275 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -39,11 +39,11 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 948af1c..43b2788 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -84,6 +84,7 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -98,7 +99,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 6ab8dec..4baac36 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -96,12 +96,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
      */
     ReadableByteChannel createDecompressingChannel(String fileName, ReadableByteChannel channel)
         throws IOException;
-
-    /**
-     * Given a file name, returns true if the file name matches any supported compression
-     * scheme.
-     */
-    boolean isCompressed(String fileName);
   }
 
   /**
@@ -242,6 +236,16 @@ public class CompressedSource<T> extends FileBasedSource<T> {
     @Override
     public abstract ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
         throws IOException;
+
+    /** Returns whether the file's extension matches of one of the known compression formats. */
+    public static boolean isCompressed(String filename) {
+      for (CompressionMode type : CompressionMode.values()) {
+        if  (type.matches(filename)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -273,16 +277,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
               ReadableByteChannel.class.getSimpleName(),
               ReadableByteChannel.class.getSimpleName()));
     }
-
-    @Override
-    public boolean isCompressed(String fileName) {
-      for (CompressionMode type : CompressionMode.values()) {
-        if  (type.matches(fileName)) {
-          return true;
-        }
-      }
-      return false;
-    }
   }
 
   private final FileBasedSource<T> sourceDelegate;
@@ -366,13 +360,9 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    */
   @Override
   protected final boolean isSplittable() throws Exception {
-    if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
-          (FileNameBasedDecompressingChannelFactory) channelFactory;
-      return !fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())
-          && sourceDelegate.isSplittable();
-    }
-    return false;
+    return channelFactory instanceof FileNameBasedDecompressingChannelFactory
+        && !CompressionMode.isCompressed(getFileOrPatternSpec())
+        && sourceDelegate.isSplittable();
   }
 
   /**
@@ -386,9 +376,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   @Override
   protected final FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
     if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) {
-      FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory =
-          (FileNameBasedDecompressingChannelFactory) channelFactory;
-      if (!fileNameBasedChannelFactory.isCompressed(getFileOrPatternSpec())) {
+      if (!CompressionMode.isCompressed(getFileOrPatternSpec())) {
         return sourceDelegate.createSingleFileReader(options);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 05f0d97..c3687a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.io.range.OffsetRangeTracker;
 import org.apache.beam.sdk.io.range.RangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -110,8 +111,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
   @Override
   public List<? extends OffsetBasedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
-    // make sure that we do not end up with a too small bundle at the end. If the desired bundle
+    // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle
     // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
 
     long desiredBundleSizeOffsetUnits = Math.max(
@@ -119,20 +119,10 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
         minBundleSize);
 
     List<OffsetBasedSource<T>> subSources = new ArrayList<>();
-    long start = startOffset;
-    long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
-
-    while (start < maxEnd) {
-      long end = start + desiredBundleSizeOffsetUnits;
-      end = Math.min(end, maxEnd);
-      // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
-      long remaining = maxEnd - end;
-      if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
-        end = maxEnd;
-      }
-      subSources.add(createSourceForSubrange(start, end));
-
-      start = end;
+    for (OffsetRange range :
+        new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))
+            .split(desiredBundleSizeOffsetUnits, minBundleSize)) {
+      subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));
     }
     return subSources;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 5241589..78340f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,25 +23,37 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -51,13 +63,14 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
- * file(s) to be read.
+ * file(s) to be read. Alternatively, if the filenames to be read are themselves in a
+ * {@link PCollection}, apply {@link TextIO#readAll()}.
  *
  * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
  * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
  * or '\r\n').
  *
- * <p>Example:
+ * <p>Example 1: reading a file or filepattern.
  *
  * <pre>{@code
  * Pipeline p = ...;
@@ -66,6 +79,19 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
+ * <p>Example 2: reading a PCollection of filenames.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // E.g. the filenames might be computed from other data in the pipeline, or
+ * // read from a data source.
+ * PCollection<String> filenames = ...;
+ *
+ * // Read all files in the collection.
+ * PCollection<String> lines = filenames.apply(TextIO.readAll());
+ * }</pre>
+ *
  * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
@@ -132,6 +158,26 @@ public class TextIO {
   }
 
   /**
+   * A {@link PTransform} that works like {@link #read}, but reads each file in a {@link
+   * PCollection} of filepatterns.
+   *
+   * <p>Can be applied to both bounded and unbounded {@link PCollection PCollections}, so this is
+   * suitable for reading a {@link PCollection} of filepatterns arriving as a stream. However, every
+   * filepattern is expanded once at the moment it is processed, rather than watched for new files
+   * matching the filepattern to appear. Likewise, every file is read once, rather than watched for
+   * new entries.
+   */
+  public static ReadAll readAll() {
+    return new AutoValue_TextIO_ReadAll.Builder()
+        .setCompressionType(CompressionType.AUTO)
+        // 64MB is a reasonable value that allows to amortize the cost of opening files,
+        // but is not so large as to exhaust a typical runner's maximum amount of output per
+        // ProcessElement call.
+        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .build();
+  }
+
+  /**
    * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
    * matching a sharding pattern), with each element of the input collection encoded into its own
    * line.
@@ -228,29 +274,34 @@ public class TextIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      switch (getCompressionType()) {
+      return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType());
+    }
+
+    private static FileBasedSource<String> wrapWithCompression(
+        FileBasedSource<String> source, CompressionType compressionType) {
+      switch (compressionType) {
         case UNCOMPRESSED:
-          return new TextSource(getFilepattern());
+          return source;
         case AUTO:
-          return CompressedSource.from(new TextSource(getFilepattern()));
+          return CompressedSource.from(source);
         case BZIP2:
           return
-              CompressedSource.from(new TextSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.BZIP2);
+              CompressedSource.from(source)
+                  .withDecompression(CompressionMode.BZIP2);
         case GZIP:
           return
-              CompressedSource.from(new TextSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.GZIP);
+              CompressedSource.from(source)
+                  .withDecompression(CompressionMode.GZIP);
         case ZIP:
           return
-              CompressedSource.from(new TextSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.ZIP);
+              CompressedSource.from(source)
+                  .withDecompression(CompressionMode.ZIP);
         case DEFLATE:
           return
-              CompressedSource.from(new TextSource(getFilepattern()))
-                  .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+              CompressedSource.from(source)
+                  .withDecompression(CompressionMode.DEFLATE);
         default:
-          throw new IllegalArgumentException("Unknown compression type: " + getFilepattern());
+          throw new IllegalArgumentException("Unknown compression type: " + compressionType);
       }
     }
 
@@ -273,7 +324,156 @@ public class TextIO {
     }
   }
 
-  // ///////////////////////////////////////////////////////////////////////////
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readAll}. */
+  @AutoValue
+  public abstract static class ReadAll
+      extends PTransform<PCollection<String>, PCollection<String>> {
+    abstract CompressionType getCompressionType();
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ReadAll build();
+    }
+
+    /** Same as {@link Read#withCompressionType(CompressionType)}. */
+    public ReadAll withCompressionType(CompressionType compressionType) {
+      return toBuilder().setCompressionType(compressionType).build();
+    }
+
+    @VisibleForTesting
+    ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> input) {
+      return input
+          .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
+          .apply(
+              "Split into ranges",
+              ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes())))
+          .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
+          .apply("Read", ParDo.of(new ReadTextFn(this)));
+    }
+
+    private static class ReshuffleWithUniqueKey<T>
+        extends PTransform<PCollection<T>, PCollection<T>> {
+      @Override
+      public PCollection<T> expand(PCollection<T> input) {
+        return input
+            .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
+            .apply("Reshuffle", Reshuffle.<Integer, T>of())
+            .apply("Values", Values.<T>create());
+      }
+    }
+
+    private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
+      private int index;
+
+      @Setup
+      public void setup() {
+        this.index = ThreadLocalRandom.current().nextInt();
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) {
+        c.output(KV.of(++index, c.element()));
+      }
+    }
+
+    private static class ExpandGlobFn extends DoFn<String, Metadata> {
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        MatchResult match = FileSystems.match(c.element());
+        checkArgument(
+            match.status().equals(Status.OK),
+            "Failed to match filepattern %s: %s",
+            c.element(),
+            match.status());
+        for (Metadata metadata : match.metadata()) {
+          c.output(metadata);
+        }
+      }
+    }
+
+    private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
+      private final CompressionType compressionType;
+      private final long desiredBundleSize;
+
+      private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize) {
+        this.compressionType = compressionType;
+        this.desiredBundleSize = desiredBundleSize;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) {
+        Metadata metadata = c.element();
+        final boolean isSplittable = isSplittable(metadata, compressionType);
+        if (!isSplittable) {
+          c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+          return;
+        }
+        for (OffsetRange range :
+            new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
+          c.output(KV.of(metadata, range));
+        }
+      }
+
+      static boolean isSplittable(Metadata metadata, CompressionType compressionType) {
+        if (!metadata.isReadSeekEfficient()) {
+          return false;
+        }
+        switch (compressionType) {
+          case AUTO:
+            return !CompressionMode.isCompressed(metadata.resourceId().toString());
+          case UNCOMPRESSED:
+            return true;
+          case GZIP:
+          case BZIP2:
+          case ZIP:
+          case DEFLATE:
+            return false;
+          default:
+            throw new UnsupportedOperationException("Unknown compression type: " + compressionType);
+        }
+      }
+    }
+
+    private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String> {
+      private final TextIO.ReadAll spec;
+
+      private ReadTextFn(ReadAll spec) {
+        this.spec = spec;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws IOException {
+        Metadata metadata = c.element().getKey();
+        OffsetRange range = c.element().getValue();
+        FileBasedSource<String> source =
+            TextIO.Read.wrapWithCompression(
+                new TextSource(StaticValueProvider.of(metadata.toString())),
+                spec.getCompressionType());
+        BoundedSource.BoundedReader<String> reader =
+            source
+                .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
+                .createReader(c.getPipelineOptions());
+        for (boolean more = reader.start(); more; more = reader.advance()) {
+          c.output(reader.getCurrent());
+        }
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
new file mode 100644
index 0000000..d3bff37
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.range;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+
+/** A restriction represented by a range of integers [from, to). */
+public class OffsetRange
+    implements Serializable,
+    HasDefaultTracker<
+                OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> {
+  private final long from;
+  private final long to;
+
+  public OffsetRange(long from, long to) {
+    checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
+    this.from = from;
+    this.to = to;
+  }
+
+  public long getFrom() {
+    return from;
+  }
+
+  public long getTo() {
+    return to;
+  }
+
+  @Override
+  public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() {
+    return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + from + ", " + to + ')';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    OffsetRange that = (OffsetRange) o;
+
+    if (from != that.from) {
+      return false;
+    }
+    return to == that.to;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (int) (from ^ (from >>> 32));
+    result = 31 * result + (int) (to ^ (to >>> 32));
+    return result;
+  }
+
+  public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) {
+    List<OffsetRange> res = new ArrayList<>();
+    long start = getFrom();
+    long maxEnd = getTo();
+
+    while (start < maxEnd) {
+      long end = start + desiredNumOffsetsPerSplit;
+      end = Math.min(end, maxEnd);
+      // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit.
+      long remaining = maxEnd - end;
+      if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) {
+        end = maxEnd;
+      }
+      res.add(new OffsetRange(start, end));
+      start = end;
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 51e2b1a..8f0083e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -26,6 +26,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A {@link RangeTracker} for non-negative positions of type {@code long}.
+ *
+ * <p>Not to be confused with {@link
+ * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}.
  */
 public class OffsetRangeTracker implements RangeTracker<Long> {
   private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
deleted file mode 100644
index 104f5f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-
-/** A restriction represented by a range of integers [from, to). */
-public class OffsetRange
-    implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> {
-  private final long from;
-  private final long to;
-
-  public OffsetRange(long from, long to) {
-    checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
-    this.from = from;
-    this.to = to;
-  }
-
-  public long getFrom() {
-    return from;
-  }
-
-  public long getTo() {
-    return to;
-  }
-
-  @Override
-  public OffsetRangeTracker newTracker() {
-    return new OffsetRangeTracker(this);
-  }
-
-  @Override
-  public String toString() {
-    return "[" + from + ", " + to + ')';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    OffsetRange that = (OffsetRange) o;
-
-    if (from != that.from) {
-      return false;
-    }
-    return to == that.to;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = (int) (from ^ (from >>> 32));
-    result = 31 * result + (int) (to ^ (to >>> 32));
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 0271a0d..62c10a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8797ff7..a6be4fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -120,10 +120,10 @@ import org.junit.runners.JUnit4;
 public class TextIOTest {
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
-  private static final String[] EMPTY = new String[] {};
-  private static final String[] TINY =
-      new String[] {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
-  private static final String[] LARGE = makeLines(1000);
+  private static final List<String> EMPTY = Collections.emptyList();
+  private static final List<String> TINY =
+      Arrays.asList("Irritable eagle", "Optimistic jay", "Fanciful hawk");
+  private static final List<String> LARGE = makeLines(1000);
 
   private static Path tempFolder;
   private static File emptyTxt;
@@ -148,7 +148,7 @@ public class TextIOTest {
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  private static File writeToFile(String[] lines, String filename, CompressionType compression)
+  private static File writeToFile(List<String> lines, String filename, CompressionType compression)
       throws IOException {
     File file = tempFolder.resolve(filename).toFile();
     OutputStream output = new FileOutputStream(file);
@@ -791,7 +791,7 @@ public class TextIOTest {
    * Helper that writes the given lines (adding a newline in between) to a stream, then closes the
    * stream.
    */
-  private static void writeToStreamAndClose(String[] lines, OutputStream outputStream) {
+  private static void writeToStreamAndClose(List<String> lines, OutputStream outputStream) {
     try (PrintStream writer = new PrintStream(outputStream)) {
       for (String line : lines) {
         writer.println(line);
@@ -800,27 +800,33 @@ public class TextIOTest {
   }
 
   /**
-   * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType)
+   * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType) and
+   * TextIO.readAll().withCompressionType(compressionType) applied to the single filename,
    * and asserts that the results match the given expected output.
    */
   private void assertReadingCompressedFileMatchesExpected(
-      File file, CompressionType compressionType, String[] expected) {
-
-    TextIO.Read read =
-        TextIO.read().from(file.getPath()).withCompressionType(compressionType);
-    PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read);
-
-    PAssert.that(output).containsInAnyOrder(expected);
+      File file, CompressionType compressionType, List<String> expected) {
+
+    TextIO.Read read = TextIO.read().from(file.getPath()).withCompressionType(compressionType);
+    PAssert.that(p.apply("Read_" + file + "_" + compressionType.toString(), read))
+        .containsInAnyOrder(expected);
+
+    TextIO.ReadAll readAll =
+        TextIO.readAll().withCompressionType(compressionType).withDesiredBundleSizeBytes(10);
+    PAssert.that(
+            p.apply("Create_" + file, Create.of(file.getPath()))
+                .apply("Read_" + compressionType.toString(), readAll))
+        .containsInAnyOrder(expected);
     p.run();
   }
 
   /**
    * Helper to make an array of compressible strings. Returns ["word"i] for i in range(0,n).
    */
-  private static String[] makeLines(int n) {
-    String[] ret = new String[n];
+  private static List<String> makeLines(int n) {
+    List<String> ret = new ArrayList<>();
     for (int i = 0; i < n; ++i) {
-      ret[i] = "word" + i;
+      ret.add("word" + i);
     }
     return ret;
   }
@@ -1004,7 +1010,7 @@ public class TextIOTest {
 
     String filename = createZipFile(expected, "multiple entries", entry0, entry1, entry2);
     assertReadingCompressedFileMatchesExpected(
-        new File(filename), CompressionType.ZIP, expected.toArray(new String[]{}));
+        new File(filename), CompressionType.ZIP, expected);
   }
 
   /**
@@ -1023,7 +1029,7 @@ public class TextIOTest {
         new String[]{"dog"});
 
     assertReadingCompressedFileMatchesExpected(
-        new File(filename), CompressionType.ZIP, new String[] {"cat", "dog"});
+        new File(filename), CompressionType.ZIP, Arrays.asList("cat", "dog"));
   }
 
   @Test
@@ -1340,5 +1346,21 @@ public class TextIOTest {
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
 
-}
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadAll() throws IOException {
+    writeToFile(TINY, "readAllTiny1.zip", ZIP);
+    writeToFile(TINY, "readAllTiny2.zip", ZIP);
+    writeToFile(LARGE, "readAllLarge1.zip", ZIP);
+    writeToFile(LARGE, "readAllLarge2.zip", ZIP);
+    PCollection<String> lines =
+        p.apply(
+                Create.of(
+                    tempFolder.resolve("readAllTiny*").toString(),
+                    tempFolder.resolve("readAllLarge*").toString()))
+            .apply(TextIO.readAll().withCompressionType(AUTO));
+    PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE));
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 0c2bd1c..cb60f9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;

http://git-wip-us.apache.org/repos/asf/beam/blob/fcb06f3b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 831894c..8aed6b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;