You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 23:57:38 UTC

[1/2] incubator-beam git commit: BoundedReader: add getSplitPoints{Consumed, Remaining}

Repository: incubator-beam
Updated Branches:
  refs/heads/master efd1f95b6 -> 4755c5a78


BoundedReader: add getSplitPoints{Consumed,Remaining}

And implement and test it for common sources

OffsetBasedReader: test limited parallelism signals

AvroSource: rewrite to support remaining parallelism

*) Make the start of a block match Avro's definition: the first byte after the previous sync marker.
   This enables detecting the last block in the file.
*) This change enables us to unify currentOffset and currentBlockOffset, as all records are emitted
   at the start of the block that contains them.
*) Simplify block header reading to have fewer object allocations and buffers using a direct
   reader and a (allocated once only) CountingInputStream to measure the size of that header.
*) Add tests for consumed and remaining parallelism
*) Let BlockBasedSource detect the end of the file in remaining parallelism.
*) Verify in more places that the correct number of bytes is read from
   the input Avro file.

CompressedSource: add tests of parallelism and progress

*) empty file
*) non-empty compressed file
*) non-empty not-compressed file

TextIO: implement and test parallelism

*) empty file
*) non-empty file

CountingSource: test limited parallelism

CompressedSource: implement currentOffset based on bytes decompressed

*) This is not a very good offset because it is an upper bound, but it is
   likely better than not reporting any progress at all.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32a6cde4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32a6cde4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32a6cde4

Branch: refs/heads/master
Commit: 32a6cde4e43726849713a7183c66aa28f43b0868
Parents: efd1f95
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 3 17:53:48 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:42:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++-------
 .../apache/beam/sdk/io/BlockBasedSource.java    |  26 +--
 .../org/apache/beam/sdk/io/BoundedSource.java   | 145 +++++++++++++++-
 .../apache/beam/sdk/io/CompressedSource.java    | 132 +++++++++++++--
 .../org/apache/beam/sdk/io/CountingSource.java  |   5 +
 .../org/apache/beam/sdk/io/DatastoreIO.java     |  13 ++
 .../org/apache/beam/sdk/io/FileBasedSource.java |   2 +-
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  49 +++++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  20 ++-
 .../beam/sdk/io/range/OffsetRangeTracker.java   | 109 ++++++++++--
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  86 +++++++++-
 .../beam/sdk/io/CompressedSourceTest.java       | 107 +++++++++++-
 .../apache/beam/sdk/io/CountingSourceTest.java  |  30 ++++
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../beam/sdk/io/OffsetBasedSourceTest.java      |  71 +++++++-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 114 ++++++++++++-
 .../sdk/io/range/OffsetRangeTrackerTest.java    |   1 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  12 ++
 18 files changed, 969 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index ef8e427..255199f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,18 +42,24 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
 import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
+import org.apache.commons.compress.utils.CountingInputStream;
 
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
+import javax.annotation.concurrent.GuardedBy;
+
 // CHECKSTYLE.OFF: JavadocStyle
 /**
  * A {@link FileBasedSource} for reading Avro files.
@@ -439,10 +447,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * the total number of records in the block and the block's size in bytes, followed by the
    * block's (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.
    *
-   * <p>Here, we consider the sync marker that precedes a block to be its offset, as this allows
-   * a reader that begins reading at that offset to detect the sync marker and the beginning of
-   * the block.
-   *
    * @param <T> The type of records contained in the block.
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
@@ -450,24 +454,25 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // The current block.
     private AvroBlock<T> currentBlock;
 
-    // Offset of the block.
+    // A lock used to synchronize block offsets for getRemainingParallelism
+    private final Object progressLock = new Object();
+
+    // Offset of the current block.
+    @GuardedBy("progressLock")
     private long currentBlockOffset = 0;
 
     // Size of the current block.
+    @GuardedBy("progressLock")
     private long currentBlockSizeBytes = 0;
 
-    // Current offset within the stream.
-    private long currentOffset = 0;
-
     // Stream used to read from the underlying file.
-    // A pushback stream is used to restore bytes buffered during seeking/decoding.
+    // A pushback stream is used to restore bytes buffered during seeking.
     private PushbackInputStream stream;
+    // Counts the number of bytes read. Used only to tell how many bytes are taken up in
+    // a block's variable-length header.
+    private CountingInputStream countStream;
 
-    // Small buffer for reading encoded values from the stream.
-    // The maximum size of an encoded long is 10 bytes, and this buffer will be used to read two.
-    private final byte[] readBuffer = new byte[20];
-
-    // Decoder to decode binary-encoded values from the buffer.
+    // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer.
     private BinaryDecoder decoder;
 
     /**
@@ -482,51 +487,67 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       return (AvroSource<T>) super.getCurrentSource();
     }
 
+    // Precondition: the stream is positioned after the sync marker in the current (about to be
+    // previous) block. currentBlockSize equals the size of the current block, or zero if this
+    // reader was just started.
+    //
+    // Postcondition: same as above, but for the new current (formerly next) block.
     @Override
     public boolean readNextBlock() throws IOException {
-      // The next block in the file is after the first sync marker that can be read starting from
-      // the current offset. First, we seek past the next sync marker, if it exists. After a sync
-      // marker is the start of a block. A block begins with the number of records contained in
-      // the block, encoded as a long, followed by the size of the block in bytes, encoded as a
-      // long. The currentOffset after this method should be last byte after this block, and the
-      // currentBlockOffset should be the start of the sync marker before this block.
-
-      // Seek to the next sync marker, if one exists.
-      currentOffset += advancePastNextSyncMarker(stream, getCurrentSource().getSyncMarker());
-
-      // The offset of the current block includes its preceding sync marker.
-      currentBlockOffset = currentOffset - getCurrentSource().getSyncMarker().length;
-
-      // Read a small buffer to parse the block header.
-      // We cannot use a BinaryDecoder to do this directly from the stream because a BinaryDecoder
-      // internally buffers data and we only want to read as many bytes from the stream as the size
-      // of the header. Though BinaryDecoder#InputStream returns an input stream that is aware of
-      // its internal buffering, we would have to re-wrap this input stream to seek for the next
-      // block in the file.
-      int read = stream.read(readBuffer);
-      // We reached the last sync marker in the file.
-      if (read <= 0) {
+      long startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
+
+      // Before reading the variable-sized block header, record the current number of bytes read.
+      long preHeaderCount = countStream.getBytesRead();
+      decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder);
+      long numRecords;
+      try {
+        numRecords = decoder.readLong();
+      } catch (EOFException e) {
+        // Expected for the last block, at which the start position is the EOF. The way to detect
+        // stream ending is to try reading from it.
         return false;
       }
-      decoder = DecoderFactory.get().binaryDecoder(readBuffer, decoder);
-      long numRecords = decoder.readLong();
       long blockSize = decoder.readLong();
 
-      // The decoder buffers data internally, but since we know the size of the stream the
-      // decoder has constructed from the readBuffer, the number of bytes available in the
-      // input stream is equal to the number of unconsumed bytes.
-      int headerSize = readBuffer.length - decoder.inputStream().available();
-      stream.unread(readBuffer, headerSize, read - headerSize);
+      // Mark header size as the change in the number of bytes read.
+      long headerSize = countStream.getBytesRead() - preHeaderCount;
 
       // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro
       // specification are [32, 2^30], so this narrowing is ok.
       byte[] data = new byte[(int) blockSize];
-      stream.read(data);
+      int read = stream.read(data);
+      checkState(blockSize == read, "Only %s/%s bytes in the block were read", read, blockSize);
       currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
-      currentBlockSizeBytes = blockSize;
 
-      // Update current offset with the number of bytes we read to get the next block.
-      currentOffset += headerSize + blockSize;
+      // Read the end of this block, which MUST be a sync marker for correctness.
+      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      byte[] readSyncMarker = new byte[syncMarker.length];
+      long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
+      long bytesRead = stream.read(readSyncMarker);
+      checkState(
+          bytesRead == syncMarker.length,
+          "When trying to read a sync marker at position %s, only able to read %s/%s bytes",
+          syncMarkerOffset,
+          bytesRead,
+          syncMarker.length);
+      if (!Arrays.equals(syncMarker, readSyncMarker)) {
+        throw new IllegalStateException(
+            String.format(
+                "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s",
+                syncMarkerOffset,
+                syncMarkerOffset + syncMarker.length,
+                getCurrentSource().getFileOrPatternSpec(),
+                Arrays.toString(readSyncMarker)
+            ));
+      }
+
+      // Atomically update both the position and offset of the new block.
+      synchronized (progressLock) {
+        currentBlockOffset = startOfNextBlock;
+        // Total block size includes the header, block content, and trailing sync marker.
+        currentBlockSizeBytes = headerSize + blockSize + syncMarker.length;
+      }
+
       return true;
     }
 
@@ -537,32 +558,65 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
     @Override
     public long getCurrentBlockOffset() {
-      return currentBlockOffset;
+      synchronized (progressLock) {
+        return currentBlockOffset;
+      }
     }
 
     @Override
     public long getCurrentBlockSize() {
-      return currentBlockSizeBytes;
+      synchronized (progressLock) {
+        return currentBlockSizeBytes;
+      }
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isDone()) {
+        return 0;
+      }
+      synchronized (progressLock) {
+        if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {
+          // This block is known to be the last block in the range.
+          return 1;
+        }
+      }
+      return super.getSplitPointsRemaining();
     }
 
     /**
      * Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able
-     * to push back the syncBuffer and the readBuffer.
+     * to push back the syncBuffer.
      */
     private PushbackInputStream createStream(ReadableByteChannel channel) {
       return new PushbackInputStream(
           Channels.newInputStream(channel),
-          getCurrentSource().getSyncMarker().length + readBuffer.length);
+          getCurrentSource().getSyncMarker().length);
     }
 
-    /**
-     * Starts reading from the provided channel. Assumes that the channel is already seeked to
-     * the source's start offset.
-     */
+    // Postcondition: the stream is positioned at the beginning of the first block after the start
+    // of the current source, and currentBlockOffset is that position. Additionally,
+    // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
     @Override
     protected void startReading(ReadableByteChannel channel) throws IOException {
+      long startOffset = getCurrentSource().getStartOffset();
+      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      long syncMarkerLength = syncMarker.length;
+
+      if (startOffset != 0) {
+        // Rewind order to find the sync marker ending the previous block.
+        long position = Math.max(0, startOffset - syncMarkerLength);
+        ((SeekableByteChannel) channel).position(position);
+        startOffset = position;
+      }
+
+      // Satisfy the post condition.
       stream = createStream(channel);
-      currentOffset = getCurrentSource().getStartOffset();
+      countStream = new CountingInputStream(stream);
+      synchronized (progressLock) {
+        currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker);
+        currentBlockSizeBytes = 0;
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 31ef055..997c77a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -206,28 +206,32 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
     }
 
     @Override
+    @Nullable
     public Double getFractionConsumed() {
-      if (getCurrentSource().getEndOffset() == Long.MAX_VALUE) {
-        return null;
-      }
-      Block<T> currentBlock = getCurrentBlock();
-      if (currentBlock == null) {
-        // There is no current block (i.e., the read has not yet begun).
+      if (!isStarted()) {
         return 0.0;
       }
+      if (isDone()) {
+        return 1.0;
+      }
+      FileBasedSource<T> source = getCurrentSource();
+      if (source.getEndOffset() == Long.MAX_VALUE) {
+        // Unknown end offset, so we cannot tell.
+        return null;
+      }
+
       long currentBlockOffset = getCurrentBlockOffset();
-      long startOffset = getCurrentSource().getStartOffset();
-      long endOffset = getCurrentSource().getEndOffset();
+      long startOffset = source.getStartOffset();
+      long endOffset = source.getEndOffset();
       double fractionAtBlockStart =
           ((double) (currentBlockOffset - startOffset)) / (endOffset - startOffset);
       double fractionAtBlockEnd =
           ((double) (currentBlockOffset + getCurrentBlockSize() - startOffset)
               / (endOffset - startOffset));
+      double blockFraction = getCurrentBlock().getFractionOfBlockConsumed();
       return Math.min(
           1.0,
-          fractionAtBlockStart
-          + currentBlock.getFractionOfBlockConsumed()
-            * (fractionAtBlockEnd - fractionAtBlockStart));
+          fractionAtBlockStart + blockFraction * (fractionAtBlockEnd - fractionAtBlockStart));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
index 8f7d3fd..394afa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.io;
 
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.range.OffsetRangeTracker;
+import org.apache.beam.sdk.io.range.RangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
@@ -27,6 +29,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import javax.annotation.Nullable;
+
 /**
  * A {@link Source} that reads a finite amount of input and, because of that, supports
  * some additional operations.
@@ -37,9 +41,16 @@ import java.util.NoSuchElementException;
  * <li>Size estimation: {@link #getEstimatedSizeBytes};
  * <li>Telling whether or not this source produces key/value pairs in sorted order:
  * {@link #producesSortedKeys};
- * <li>The reader ({@link BoundedReader}) supports progress estimation
- * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting
- * ({@link BoundedReader#splitAtFraction}).
+ * <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners
+ * to dynamically adapt based on runtime conditions.
+ *     <ul>
+ *       <li>Progress estimation ({@link BoundedReader#getFractionConsumed})
+ *       <li>Tracking of parallelism, to determine whether the current source can be split
+ *        ({@link BoundedReader#getSplitPointsConsumed()} and
+ *        {@link BoundedReader#getSplitPointsRemaining()}).
+ *       <li>Dynamic splitting of the current source ({@link BoundedReader#splitAtFraction}).
+ *     </ul>
+ *     </li>
  * </ul>
  *
  * <p>To use this class for supporting your custom input type, derive your class
@@ -82,14 +93,14 @@ public abstract class BoundedSource<T> extends Source<T> {
    *
    * <h3>Thread safety</h3>
    * All methods will be run from the same thread except {@link #splitAtFraction},
-   * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently
+   * {@link #getFractionConsumed}, {@link #getCurrentSource}, {@link #getSplitPointsConsumed()},
+   * and {@link #getSplitPointsRemaining()}, all of which can be called concurrently
    * from a different thread. There will not be multiple concurrent calls to
-   * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if
-   * {@link #splitAtFraction} is implemented.
+   * {@link #splitAtFraction}.
    *
-   * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about
-   * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and
-   * {@link #getFractionConsumed} concurrently with other methods.
+   * <p>It must be safe to call {@link #splitAtFraction}, {@link #getFractionConsumed},
+   * {@link #getCurrentSource}, {@link #getSplitPointsConsumed()}, and
+   * {@link #getSplitPointsRemaining()} concurrently with other methods.
    *
    * <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause
    * {@link #getCurrentSource} to start returning a different value.
@@ -129,11 +140,126 @@ public abstract class BoundedSource<T> extends Source<T> {
      * methods (including itself), and it is therefore critical for it to be implemented
      * in a thread-safe way.
      */
+    @Nullable
     public Double getFractionConsumed() {
       return null;
     }
 
     /**
+     * A constant to use as the return value for {@link #getSplitPointsConsumed()} or
+     * {@link #getSplitPointsRemaining()} when the exact value is unknown.
+     */
+    public static final long SPLIT_POINTS_UNKNOWN = -1;
+
+    /**
+     * Returns the total amount of parallelism in the consumed (returned and processed) range of
+     * this reader's current {@link BoundedSource} (as would be returned by
+     * {@link #getCurrentSource}). This corresponds to all split point records (see
+     * {@link RangeTracker}) returned by this reader, <em>excluding</em> the last split point
+     * returned if the reader is not finished.
+     *
+     * <p>Consider the following examples: (1) An input that can be read in parallel down to the
+     * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable".
+     * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has
+     * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
+     * input such as a cursor in a database.
+     *
+     * <ul>
+     * <li>Any {@link BoundedReader reader} that is unstarted (aka, has never had a call to
+     * {@link #start}) has a consumed parallelism of 0. This condition holds independent of whether
+     * the input is splittable.
+     * <li>Any {@link BoundedReader reader} that has only returned its first element (aka,
+     * has never had a call to {@link #advance}) has a consumed parallelism of 0: the first element
+     * is the current element and is still being processed. This condition holds independent of
+     * whether the input is splittable.
+     * <li>For an empty reader (in which the call to {@link #start} returned false), the
+     * consumed parallelism is 0. This condition holds independent of whether the input is
+     * splittable.
+     * <li>For a non-empty, finished reader (in which the call to {@link #start} returned true and
+     * a call to {@link #advance} has returned false), the value returned must be at least 1
+     * and should equal the total parallelism in the source.
+     * <li>For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly
+     * splittable 50-record input, this value should be 29. When finished, the consumed parallelism
+     * should be 50.
+     * <li>For example (2): In a block-compressed value consisting of 5 blocks, the value should
+     * stay at 0 until the first record of the second block is returned; stay at 1 until the first
+     * record of the third block is returned, etc. Only once the end-of-file is reached then the
+     * fifth block has been consumed and the value should stay at 5.
+     * <li>For example (3): For any non-empty unsplittable input, the consumed parallelism is 0
+     * until the reader is finished (because the last call to {@link #advance} returned false, at
+     * which point it becomes 1.
+     * </ul>
+     *
+     * <p>A reader that is implemented using a {@link RangeTracker} is encouraged to use the
+     * range tracker's ability to count split points to implement this method. See
+     * {@link OffsetBasedSource.OffsetBasedReader} and {@link OffsetRangeTracker} for an example.
+     *
+     * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted
+     * as unknown.
+     *
+     * <h3>Thread safety</h3>
+     * See the javadoc on {@link BoundedReader} for information about thread safety.
+     *
+     * @see #getSplitPointsRemaining()
+     */
+    public long getSplitPointsConsumed() {
+      return SPLIT_POINTS_UNKNOWN;
+    }
+
+    /**
+     * Returns the total amount of parallelism in the unprocessed part of this reader's current
+     * {@link BoundedSource} (as would be returned by {@link #getCurrentSource}). This corresponds
+     * to all unprocessed split point records (see {@link RangeTracker}), including the last
+     * split point returned, in the remainder part of the source.
+     *
+     * <p>This function should be implemented only <strong>in addition to
+     * {@link #getSplitPointsConsumed()}</strong> and only if <em>an exact value can be
+     * returned</em>.
+     *
+     * <p>Consider the following examples: (1) An input that can be read in parallel down to the
+     * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable".
+     * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has
+     * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
+     * input such as a cursor in a database.
+     *
+     * <p>Assume for examples (1) and (2) that the number of records or blocks remaining is known:
+     *
+     * <ul>
+     * <li>Any {@link BoundedReader reader} for which the last call to {@link #start} or
+     * {@link #advance} has returned true should should not return 0, because this reader itself
+     * represents parallelism at least 1. This condition holds independent of whether the input is
+     * splittable.
+     * <li>A finished reader (for which {@link #start} or {@link #advance}) has returned false
+     * should return a value of 0. This condition holds independent of whether the input is
+     * splittable.
+     * <li>For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly
+     * splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total
+     * number of records is known.
+     * <li>For example 2: After returning a record in block 3 in a block-compressed file
+     * consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in
+     * parallel by new readers produced via dynamic work rebalancing, while the current reader
+     * continues processing block 3) if the total number of blocks is known.
+     * <li>For example (3): a reader for any non-empty unsplittable input, should return 1 until
+     * it is finished, at which point it should return 0.
+     * <li>For any reader: After returning the last split point in a file (e.g., the last record
+     * in example (1), the first record in the last block for example (2), or the first record in
+     * the file for example (3), this value should be 1: apart from the current task, no additional
+     * remainder can be split off.
+     * </ul>
+     *
+     * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted as
+     * unknown.
+     *
+     * <h3>Thread safety</h3>
+     * See the javadoc on {@link BoundedReader} for information about thread safety.
+     *
+     * @see #getSplitPointsConsumed()
+     */
+    public long getSplitPointsRemaining() {
+      return SPLIT_POINTS_UNKNOWN;
+    }
+
+    /**
      * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
      * (including items already read).
      *
@@ -263,6 +389,7 @@ public abstract class BoundedSource<T> extends Source<T> {
      *
      * <p>By default, returns null to indicate that splitting is not possible.
      */
+    @Nullable
     public BoundedSource<T> splitAtFraction(double fraction) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 5cb0684..8bccf5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -32,11 +32,14 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import java.io.IOException;
 import java.io.PushbackInputStream;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.NoSuchElementException;
 import java.util.zip.GZIPInputStream;
 
+import javax.annotation.concurrent.GuardedBy;
+
 /**
  * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
  * {@link FileBasedSource} that is able to read the decompressed file format.
@@ -361,7 +364,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
 
     private final FileBasedReader<T> readerDelegate;
     private final CompressedSource<T> source;
+    private final boolean splittable;
+    private final Object progressLock = new Object();
+    @GuardedBy("progressLock")
     private int numRecordsRead;
+    @GuardedBy("progressLock")
+    private CountingChannel channel;
 
     /**
      * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
@@ -369,6 +377,13 @@ public class CompressedSource<T> extends FileBasedSource<T> {
     public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
       super(source);
       this.source = source;
+      boolean splittable;
+      try {
+        splittable = source.isSplittable();
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to tell whether source " + source + " is splittable", e);
+      }
+      this.splittable = splittable;
       this.readerDelegate = readerDelegate;
     }
 
@@ -380,18 +395,78 @@ public class CompressedSource<T> extends FileBasedSource<T> {
       return readerDelegate.getCurrent();
     }
 
+    @Override
+    public final long getSplitPointsConsumed() {
+      if (splittable) {
+        return readerDelegate.getSplitPointsConsumed();
+      } else {
+        synchronized (progressLock) {
+          return (isDone() && numRecordsRead > 0) ? 1 : 0;
+        }
+      }
+    }
+
+    @Override
+    public final long getSplitPointsRemaining() {
+      if (splittable) {
+        return readerDelegate.getSplitPointsRemaining();
+      } else {
+        return isDone() ? 0 : 1;
+      }
+    }
+
     /**
      * Returns true only for the first record; compressed sources cannot be split.
      */
     @Override
     protected final boolean isAtSplitPoint() {
-      // We have to return true for the first record, but not for the state before reading it,
-      // and not for the state after reading any other record. Hence == rather than >= or <=.
-      // This is required because FileBasedReader is intended for readers that can read a range
-      // of offsets in a file and where the range can be split in parts. CompressedReader,
-      // however, is a degenerate case because it cannot be split, but it has to satisfy the
-      // semantics of offsets and split points anyway.
-      return numRecordsRead == 1;
+      if (splittable) {
+        return readerDelegate.isAtSplitPoint();
+      } else {
+        // We have to return true for the first record, but not for the state before reading it,
+        // and not for the state after reading any other record. Hence == rather than >= or <=.
+        // This is required because FileBasedReader is intended for readers that can read a range
+        // of offsets in a file and where the range can be split in parts. CompressedReader,
+        // however, is a degenerate case because it cannot be split, but it has to satisfy the
+        // semantics of offsets and split points anyway.
+        synchronized (progressLock) {
+          return numRecordsRead == 1;
+        }
+      }
+    }
+
+    private static class CountingChannel implements ReadableByteChannel {
+      long count;
+      private final ReadableByteChannel inner;
+
+      public CountingChannel(ReadableByteChannel inner, long count) {
+        this.inner = inner;
+        this.count = count;
+      }
+
+      public long getCount() {
+        return count;
+      }
+
+      @Override
+      public int read(ByteBuffer dst) throws IOException {
+        int bytes = inner.read(dst);
+        if (bytes > 0) {
+          // Avoid the -1 from EOF.
+          count += bytes;
+        }
+        return bytes;
+      }
+
+      @Override
+      public boolean isOpen() {
+        return inner.isOpen();
+      }
+
+      @Override
+      public void close() throws IOException {
+        inner.close();
+      }
     }
 
     /**
@@ -400,6 +475,16 @@ public class CompressedSource<T> extends FileBasedSource<T> {
      */
     @Override
     protected final void startReading(ReadableByteChannel channel) throws IOException {
+      if (splittable) {
+        // No-op. We will always delegate to the inner reader, so this.channel and this.progressLock
+        // will never be used.
+      } else {
+        synchronized (progressLock) {
+          this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset());
+          channel = this.channel;
+        }
+      }
+
       if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
         FileNameBasedDecompressingChannelFactory channelFactory =
             (FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
@@ -420,16 +505,37 @@ public class CompressedSource<T> extends FileBasedSource<T> {
       if (!readerDelegate.readNextRecord()) {
         return false;
       }
-      ++numRecordsRead;
+      synchronized (progressLock) {
+        ++numRecordsRead;
+      }
       return true;
     }
 
-    /**
-     * Returns the delegate reader's current offset in the decompressed input.
-     */
+    // Splittable: simply delegates to the inner reader.
+    //
+    // Unsplittable: returns the offset in the input stream that has been read by the input.
+    // these positions are likely to be coarse-grained (in the event of buffering) and
+    // over-estimates (because they reflect the number of bytes read to produce an element, not its
+    // start) but both of these provide better data than e.g., reporting the start of the file.
     @Override
-    protected final long getCurrentOffset() {
-      return readerDelegate.getCurrentOffset();
+    protected final long getCurrentOffset() throws NoSuchElementException {
+      if (splittable) {
+        return readerDelegate.getCurrentOffset();
+      } else {
+        synchronized (progressLock) {
+          if (numRecordsRead <= 1) {
+            // Since the first record is at a split point, it should start at the beginning of the
+            // file. This avoids the bad case where the decompressor read the entire file, which
+            // would cause the file to be treated as empty when returning channel.getCount() as it
+            // is outside the valid range.
+            return 0;
+          }
+          if (channel == null) {
+            throw new NoSuchElementException();
+          }
+          return channel.getCount();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index b28e866..403d22e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -210,6 +210,11 @@ public class CountingSource {
     }
 
     @Override
+    public synchronized long getSplitPointsRemaining() {
+      return Math.max(0, getCurrentSource().getEndOffset() - current);
+    }
+
+    @Override
     public synchronized BoundedCountingSource getCurrentSource()  {
       return (BoundedCountingSource) super.getCurrentSource();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
index cc8e923..137c6cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
@@ -865,6 +865,8 @@ public class DatastoreIO {
      */
     private int userLimit;
 
+    private volatile boolean done = false;
+
     private Entity currentEntity;
 
     /**
@@ -885,6 +887,16 @@ public class DatastoreIO {
     }
 
     @Override
+    public final long getSplitPointsConsumed() {
+      return done ? 1 : 0;
+    }
+
+    @Override
+    public final long getSplitPointsRemaining() {
+      return done ? 0 : 1;
+    }
+
+    @Override
     public boolean start() throws IOException {
       return advance();
     }
@@ -901,6 +913,7 @@ public class DatastoreIO {
 
       if (entities == null || !entities.hasNext()) {
         currentEntity = null;
+        done = true;
         return false;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 96aeda5..f000f6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -489,7 +489,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     }
 
     @Override
-    public FileBasedSource<T> getCurrentSource() {
+    public synchronized FileBasedSource<T> getCurrentSource() {
       return (FileBasedSource<T>) super.getCurrentSource();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 9ee89a2..2f62acd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -180,7 +180,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
    *
    * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
    * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
-   * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine
+   * indicate that a file should be read to the end. Then this function should determine
    * the actual, exact size of the file in bytes and return it.
    */
   public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
@@ -230,9 +230,22 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
    */
   public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
     private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
-
     private OffsetBasedSource<T> source;
 
+    /**
+     * Returns true if the last call to {@link #start} or {@link #advance} returned false.
+     */
+    public final boolean isDone() {
+      return rangeTracker.isDone();
+    }
+
+    /**
+     * Returns true if there has been a call to {@link #start}.
+     */
+    public final boolean isStarted() {
+      return rangeTracker.isStarted();
+    }
+
     /** The {@link OffsetRangeTracker} managing the range and current position of the source. */
     private final OffsetRangeTracker rangeTracker;
 
@@ -266,12 +279,14 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
 
     @Override
     public final boolean start() throws IOException {
-      return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
+      return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())
+          || rangeTracker.markDone();
     }
 
     @Override
     public final boolean advance() throws IOException {
-      return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
+      return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())
+          || rangeTracker.markDone();
     }
 
     /**
@@ -315,6 +330,32 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
     }
 
     @Override
+    public long getSplitPointsConsumed() {
+      return rangeTracker.getSplitPointsProcessed();
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isDone()) {
+        return 0;
+      } else if (!isStarted()) {
+        // Note that even if the current source does not allow splitting, we don't know that
+        // it's non-empty so we return UNKNOWN instead of 1.
+        return BoundedReader.SPLIT_POINTS_UNKNOWN;
+      } else if (!getCurrentSource().allowsDynamicSplitting()) {
+        // Started (so non-empty) and unsplittable, so only the current task.
+        return 1;
+      } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {
+        // If this is true, the next element is outside the range. Note that even getCurrentOffset()
+        // might be larger than the stop position when the current record is not a split point.
+        return 1;
+      } else {
+        // Use the default.
+        return super.getSplitPointsRemaining();
+      }
+    }
+
+    @Override
     public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
       if (!getCurrentSource().allowsDynamicSplitting()) {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 79eeb08..13cb45e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -817,9 +817,10 @@ public class TextIO {
       private ByteString buffer;
       private int startOfSeparatorInBuffer;
       private int endOfSeparatorInBuffer;
-      private long startOfNextRecord;
-      private boolean eof;
-      private boolean elementIsPresent;
+      private long startOfRecord;
+      private volatile long startOfNextRecord;
+      private volatile boolean eof;
+      private volatile boolean elementIsPresent;
       private T currentValue;
       private ReadableByteChannel inChannel;
 
@@ -834,7 +835,15 @@ public class TextIO {
         if (!elementIsPresent) {
           throw new NoSuchElementException();
         }
-        return startOfNextRecord;
+        return startOfRecord;
+      }
+
+      @Override
+      public long getSplitPointsRemaining() {
+        if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+          return isDone() ? 0 : 1;
+        }
+        return super.getSplitPointsRemaining();
       }
 
       @Override
@@ -912,7 +921,7 @@ public class TextIO {
 
       @Override
       protected boolean readNextRecord() throws IOException {
-        startOfNextRecord += endOfSeparatorInBuffer;
+        startOfRecord = startOfNextRecord;
         findSeparatorBounds();
 
         // If we have reached EOF file and consumed all of the buffer then we know
@@ -923,6 +932,7 @@ public class TextIO {
         }
 
         decodeCurrentElement();
+        startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
         return true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index ea1cf14..76790af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.io.range;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+
 import com.google.common.annotations.VisibleForTesting;
 
 import org.slf4j.Logger;
@@ -32,6 +36,8 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
   private long stopOffset;
   private long lastRecordStart = -1L;
   private long offsetOfLastSplitPoint = -1L;
+  private long splitPointsSeen = 0L;
+  private boolean done = false;
 
   /**
    * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and
@@ -49,6 +55,15 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
     this.stopOffset = stopOffset;
   }
 
+  public synchronized boolean isStarted() {
+    // done => started: handles the case when the reader was empty.
+    return (offsetOfLastSplitPoint != -1) || done;
+  }
+
+  public synchronized boolean isDone() {
+    return done;
+  }
+
   @Override
   public synchronized Long getStartPosition() {
     return startOffset;
@@ -65,10 +80,18 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
   }
 
   public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {
-    if (lastRecordStart == -1 && !isAtSplitPoint) {
+    if (!isStarted() && !isAtSplitPoint) {
       throw new IllegalStateException(
           String.format("The first record [starting at %d] must be at a split point", recordStart));
     }
+    if (recordStart < startOffset) {
+      throw new IllegalStateException(
+          String.format(
+              "Trying to return record [starting at %d] which is before the start offset [%d]",
+              recordStart,
+              startOffset));
+
+    }
     if (recordStart < lastRecordStart) {
       throw new IllegalStateException(
           String.format(
@@ -77,8 +100,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
               recordStart,
               lastRecordStart));
     }
+
+    lastRecordStart = recordStart;
+
     if (isAtSplitPoint) {
-      if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) {
+      if (recordStart == offsetOfLastSplitPoint) {
         throw new IllegalStateException(
             String.format(
                 "Record at a split point has same offset as the previous split point: "
@@ -86,12 +112,13 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
                 offsetOfLastSplitPoint, recordStart));
       }
       if (recordStart >= stopOffset) {
+        done = true;
         return false;
       }
       offsetOfLastSplitPoint = recordStart;
+      ++splitPointsSeen;
     }
 
-    lastRecordStart = recordStart;
     return true;
   }
 
@@ -105,7 +132,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
       LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);
       return false;
     }
-    if (lastRecordStart == -1) {
+    if (!isStarted()) {
       LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);
       return false;
     }
@@ -143,17 +170,72 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
 
   @Override
   public synchronized double getFractionConsumed() {
-    if (stopOffset == OFFSET_INFINITY) {
+    if (!isStarted()) {
       return 0.0;
-    }
-    if (lastRecordStart == -1) {
+    } else if (isDone()) {
+      return 1.0;
+    } else if (stopOffset == OFFSET_INFINITY) {
       return 0.0;
+    } else if (lastRecordStart >= stopOffset) {
+      return 1.0;
+    } else {
+      // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
+      // which is (4 - 3 + 1) / (6 - 3) = 67%.
+      // Also, clamp to at most 1.0 because the last consumed position can extend past the
+      // stop position.
+      return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
     }
-    // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
-    // which is (4 - 3 + 1) / (6 - 3) = 67%.
-    // Also, clamp to at most 1.0 because the last consumed position can extend past the
-    // stop position.
-    return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
+  }
+
+  /**
+   * Returns the total number of split points that have been processed.
+   *
+   * <p>A split point at a particular offset has been seen if there has been a corresponding call
+   * to {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true. It has been
+   * processed if there has been a <em>subsequent</em> call to
+   * {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true and at a larger
+   * offset.
+   *
+   * <p>Note that for correctness when implementing {@link BoundedReader#getSplitPointsConsumed()},
+   * if a reader finishes before {@link #tryReturnRecordAt(boolean, long)} returns false,
+   * the reader should add an additional call to {@link #markDone()}. This will indicate that
+   * processing for the last seen split point has been finished.
+   *
+   * @see org.apache.beam.sdk.io.OffsetBasedSource for a {@link BoundedReader}
+   * implemented using {@link OffsetRangeTracker}.
+   */
+  public synchronized long getSplitPointsProcessed() {
+    if (!isStarted()) {
+      return 0;
+    } else if (isDone()) {
+      return splitPointsSeen;
+    } else {
+      // There is a current split point, and it has not finished processing.
+      checkState(
+          splitPointsSeen > 0,
+          "A started rangeTracker should have seen > 0 split points (is %s)",
+          splitPointsSeen);
+      return splitPointsSeen - 1;
+    }
+  }
+
+
+  /**
+   * Marks this range tracker as being done. Specifically, this will mark the current split point,
+   * if one exists, as being finished.
+   *
+   * <p>Always returns false, so that it can be used in an implementation of
+   * {@link BoundedReader#start()} or {@link BoundedReader#advance()} as follows:
+   *
+   * <pre> {@code
+   * public boolean start() {
+   *   return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)
+   *       || rangeTracker.markDone();
+   * }} </pre>
+   */
+  public synchronized boolean markDone() {
+    done = true;
+    return false;
   }
 
   @Override
@@ -177,7 +259,10 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
   @VisibleForTesting
   OffsetRangeTracker copy() {
     OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset);
+    res.offsetOfLastSplitPoint = this.offsetOfLastSplitPoint;
     res.lastRecordStart = this.lastRecordStart;
+    res.done = this.done;
+    res.splitPointsSeen = this.splitPointsSeen;
     return res;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 20c21bc..13f8e7f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroSource.AvroReader;
 import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
+import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -44,6 +46,7 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -57,6 +60,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PushbackInputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -198,6 +202,86 @@ public class AvroSourceTest {
   }
 
   @Test
+  public void testProgress() throws Exception {
+    // 5 records, 2 per block.
+    List<FixedRecord> records = createFixedRecords(5);
+    String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2,
+        AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC);
+
+    AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
+    try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
+      assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
+      BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
+
+      // Before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // First 2 records are in the same block.
+      assertTrue(reader.start());
+      assertTrue(reader.isAtSplitPoint());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+      // continued
+      assertTrue(reader.advance());
+      assertFalse(reader.isAtSplitPoint());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Second block -> parallelism consumed becomes 1.
+      assertTrue(reader.advance());
+      assertTrue(reader.isAtSplitPoint());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+      // continued
+      assertTrue(reader.advance());
+      assertFalse(reader.isAtSplitPoint());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Third and final block -> parallelism consumed becomes 2, remaining becomes 1.
+      assertTrue(reader.advance());
+      assertTrue(reader.isAtSplitPoint());
+      assertEquals(2, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // Done
+      assertFalse(reader.advance());
+      assertEquals(3, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+    }
+  }
+
+  @Test
+  public void testProgressEmptySource() throws Exception {
+    // 0 records, 20 per block.
+    List<FixedRecord> records = Collections.emptyList();
+    String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2,
+        AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC);
+
+    AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
+    try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
+      assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
+      BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
+
+      // before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // confirm empty
+      assertFalse(reader.start());
+
+      // after reading empty source
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+    }
+  }
+
+  @Test
   public void testGetCurrentFromUnstartedReader() throws Exception {
     List<FixedRecord> records = createFixedRecords(DEFAULT_RECORD_COUNT);
     String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_DEFAULT, 1000,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 542e734..7161c1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -19,9 +19,10 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -29,8 +30,11 @@ import static org.junit.Assert.assertTrue;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CompressedSource.CompressedReader;
 import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
 import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory;
+import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -462,11 +466,12 @@ public class CompressedSourceTest {
     private static class ByteReader extends FileBasedReader<Byte> {
       ByteBuffer buff = ByteBuffer.allocate(1);
       Byte current;
-      long offset = -1;
+      long offset;
       ReadableByteChannel channel;
 
       public ByteReader(ByteSource source) {
         super(source);
+        offset = source.getStartOffset() - 1;
       }
 
       @Override
@@ -501,4 +506,102 @@ public class CompressedSourceTest {
       }
     }
   }
+
+  @Test
+  public void testEmptyGzipProgress() throws IOException {
+    File tmpFile = tmpFolder.newFile("empty.gz");
+    String filename = tmpFile.toPath().toString();
+    writeFile(tmpFile, new byte[0], CompressionMode.GZIP);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+    try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+      assertThat(readerOrig, instanceOf(CompressedReader.class));
+      CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig;
+      // before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // confirm empty
+      assertFalse(reader.start());
+
+      // after reading empty source
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
+  public void testGzipProgress() throws IOException {
+    int numRecords = 3;
+    File tmpFile = tmpFolder.newFile("nonempty.gz");
+    String filename = tmpFile.toPath().toString();
+    writeFile(tmpFile, new byte[numRecords], CompressionMode.GZIP);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+    try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+      assertThat(readerOrig, instanceOf(CompressedReader.class));
+      CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig;
+      // before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // confirm has three records
+      for (int i = 0; i < numRecords; ++i) {
+        if (i == 0) {
+          assertTrue(reader.start());
+        } else {
+          assertTrue(reader.advance());
+        }
+        assertEquals(0, reader.getSplitPointsConsumed());
+        assertEquals(1, reader.getSplitPointsRemaining());
+      }
+      assertFalse(reader.advance());
+
+      // after reading empty source
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
+  public void testSplittableProgress() throws IOException {
+    File tmpFile = tmpFolder.newFile("nonempty.txt");
+    String filename = tmpFile.toPath().toString();
+    Files.write(new byte[2], tmpFile);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+    try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+      assertThat(readerOrig, not(instanceOf(CompressedReader.class)));
+      assertThat(readerOrig, instanceOf(FileBasedReader.class));
+      FileBasedReader<Byte> reader = (FileBasedReader<Byte>) readerOrig;
+
+      // Check preconditions before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // First record: none consumed, unknown remaining.
+      assertTrue(reader.start());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Second record: 1 consumed, know that we're on the last record.
+      assertTrue(reader.advance());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // Confirm empty and check post-conditions
+      assertFalse(reader.advance());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(2, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index a261fb2..bf68d41 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.CountingSource.CounterMark;
 import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -49,6 +51,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -116,6 +119,33 @@ public class CountingSourceTest {
   }
 
   @Test
+  public void testProgress() throws IOException {
+    final int numRecords = 5;
+    @SuppressWarnings("deprecation")  // testing CountingSource
+    BoundedSource<Long> source = CountingSource.upTo(numRecords);
+    try (BoundedReader<Long> reader = source.createReader(PipelineOptionsFactory.create())) {
+      // Check preconditions before starting. Note that CountingReader can always give an accurate
+      // remaining parallelism.
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(numRecords, reader.getSplitPointsRemaining());
+
+      assertTrue(reader.start());
+      int i = 0;
+      do {
+        assertEquals(i, reader.getSplitPointsConsumed());
+        assertEquals(numRecords - i, reader.getSplitPointsRemaining());
+        ++i;
+      } while (reader.advance());
+
+      assertEquals(numRecords, i); // exactly numRecords calls to advance()
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(numRecords, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
   @Category(RunnableOnService.class)
   public void testUnboundedSource() {
     Pipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index bedbc99..1f16d39 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -446,7 +446,7 @@ public class FileBasedSourceTest {
         assertTrue(fractionConsumed > lastFractionConsumed);
         lastFractionConsumed = fractionConsumed;
       }
-      assertTrue(reader.getFractionConsumed() < 1.0);
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index e9b61aa..66abd33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -28,6 +27,8 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
@@ -86,13 +87,12 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+    public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException {
       return new CoarseRangeReader(this);
     }
   }
 
-  private static class CoarseRangeReader
-      extends OffsetBasedSource.OffsetBasedReader<Integer> {
+  private static class CoarseRangeReader extends OffsetBasedReader<Integer> {
     private long current = -1;
     private long granularity;
 
@@ -239,6 +239,69 @@ public class OffsetBasedSourceTest {
   }
 
   @Test
+  public void testProgress() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 2);
+    try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
+      // Unstarted reader
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Start and produce the element 14 since granularity is 2.
+      assertTrue(reader.start());
+      assertTrue(reader.isAtSplitPoint());
+      assertEquals(14, reader.getCurrent().intValue());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+      // Advance and produce the element 15, not a split point.
+      assertTrue(reader.advance());
+      assertEquals(15, reader.getCurrent().intValue());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Advance and produce the element 16, is a split point. Since the next offset (17) is
+      // outside the range [13, 17), remaining parallelism should become 1 from UNKNOWN.
+      assertTrue(reader.advance());
+      assertTrue(reader.isAtSplitPoint());
+      assertEquals(16, reader.getCurrent().intValue());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining()); // The next offset is outside the range.
+      // Advance and produce the element 17, not a split point.
+      assertTrue(reader.advance());
+      assertEquals(17, reader.getCurrent().intValue());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // Advance and reach the end of the reader.
+      assertFalse(reader.advance());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(2, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
+  public void testProgressEmptySource() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100);
+    try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
+      // before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // confirm empty
+      assertFalse(reader.start());
+
+      // after reading empty source
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
   public void testSplitAtFraction() throws IOException {
     PipelineOptions options = PipelineOptionsFactory.create();
     CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 4d6d8dd..53a2a89 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,10 +22,10 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -423,6 +424,117 @@ public class TextIOTest {
   }
 
   @Test
+  public void testProgressEmptyFile() throws IOException {
+    try (BoundedReader<String> reader =
+        prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) {
+      // Check preconditions before starting.
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Assert empty
+      assertFalse(reader.start());
+
+      // Check postconditions after finishing
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
+  public void testProgressTextFile() throws IOException {
+    String file = "line1\nline2\nline3";
+    try (BoundedReader<String> reader =
+        prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) {
+      // Check preconditions before starting
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Line 1
+      assertTrue(reader.start());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Line 2
+      assertTrue(reader.advance());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Line 3
+      assertTrue(reader.advance());
+      assertEquals(2, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // Check postconditions after finishing
+      assertFalse(reader.advance());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(3, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
+  public void testProgressAfterSplitting() throws IOException {
+    String file = "line1\nline2\nline3";
+    BoundedSource source = prepareSource(file.getBytes());
+    BoundedSource remainder;
+
+    // Create the remainder, verifying properties pre- and post-splitting.
+    try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) {
+      // Preconditions.
+      assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6);
+      assertEquals(0, readerOrig.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining());
+
+      // First record, before splitting.
+      assertTrue(readerOrig.start());
+      assertEquals(0, readerOrig.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining());
+
+      // Split. 0.1 is in line1, so should now be able to detect last record.
+      remainder = readerOrig.splitAtFraction(0.1);
+      System.err.println(readerOrig.getCurrentSource());
+      assertNotNull(remainder);
+
+      // First record, after splitting.
+      assertEquals(0, readerOrig.getSplitPointsConsumed());
+      assertEquals(1, readerOrig.getSplitPointsRemaining());
+
+      // Finish and postconditions.
+      assertFalse(readerOrig.advance());
+      assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6);
+      assertEquals(1, readerOrig.getSplitPointsConsumed());
+      assertEquals(0, readerOrig.getSplitPointsRemaining());
+    }
+
+    // Check the properties of the remainder.
+    try (BoundedReader<String> reader = remainder.createReader(PipelineOptionsFactory.create())) {
+      // Preconditions.
+      assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // First record should be line 2.
+      assertTrue(reader.start());
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+      // Second record is line 3
+      assertTrue(reader.advance());
+      assertEquals(1, reader.getSplitPointsConsumed());
+      assertEquals(1, reader.getSplitPointsRemaining());
+
+      // Check postconditions after finishing
+      assertFalse(reader.advance());
+      assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+      assertEquals(2, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+    }
+  }
+
+  @Test
   public void testReadEmptyLines() throws Exception {
     runTestReadWithData("\n\n\n".getBytes(StandardCharsets.UTF_8),
         ImmutableList.of("", "", ""));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
index 3de04f7..edd4c4f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
@@ -104,7 +104,6 @@ public class OffsetRangeTrackerTest {
     assertFalse(tracker.tryReturnRecordAt(true, 150));
     assertFalse(tracker.tryReturnRecordAt(true, 151));
     // Should accept non-splitpoint records starting after stop offset.
-    assertTrue(tracker.tryReturnRecordAt(false, 135));
     assertTrue(tracker.tryReturnRecordAt(false, 152));
     assertTrue(tracker.tryReturnRecordAt(false, 160));
     assertFalse(tracker.tryReturnRecordAt(true, 171));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index ab537eb..41a271c 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -282,6 +282,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     private Configuration conf;
     private RecordReader<K, V> currentReader;
     private KV<K, V> currentPair;
+    private volatile boolean done = false;
 
     /**
      * Create a {@code HDFSFileReader} based on a file or a file pattern specification.
@@ -356,6 +357,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
           }
           // either no next split or all readers were empty
           currentPair = null;
+          done = true;
           return false;
         }
       } catch (InterruptedException e) {
@@ -433,6 +435,16 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
     }
 
     @Override
+    public final long getSplitPointsRemaining() {
+      if (done) {
+        return 0;
+      }
+      // This source does not currently support dynamic work rebalancing, so remaining
+      // parallelism is always 1.
+      return 1;
+    }
+
+    @Override
     public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
       // Not yet supported. To implement this, the sizes of the splits should be used to
       // calculate the remaining splits that constitute the given fraction, then a


[2/2] incubator-beam git commit: This closes #353

Posted by dh...@apache.org.
This closes #353


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4755c5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4755c5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4755c5a7

Branch: refs/heads/master
Commit: 4755c5a784d628bdbfc67e4870d231252381ae6e
Parents: efd1f95 32a6cde
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 20 16:55:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:55:57 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++-------
 .../apache/beam/sdk/io/BlockBasedSource.java    |  26 +--
 .../org/apache/beam/sdk/io/BoundedSource.java   | 145 +++++++++++++++-
 .../apache/beam/sdk/io/CompressedSource.java    | 132 +++++++++++++--
 .../org/apache/beam/sdk/io/CountingSource.java  |   5 +
 .../org/apache/beam/sdk/io/DatastoreIO.java     |  13 ++
 .../org/apache/beam/sdk/io/FileBasedSource.java |   2 +-
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  49 +++++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  20 ++-
 .../beam/sdk/io/range/OffsetRangeTracker.java   | 109 ++++++++++--
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  86 +++++++++-
 .../beam/sdk/io/CompressedSourceTest.java       | 107 +++++++++++-
 .../apache/beam/sdk/io/CountingSourceTest.java  |  30 ++++
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   2 +-
 .../beam/sdk/io/OffsetBasedSourceTest.java      |  71 +++++++-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 114 ++++++++++++-
 .../sdk/io/range/OffsetRangeTrackerTest.java    |   1 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  12 ++
 18 files changed, 969 insertions(+), 121 deletions(-)
----------------------------------------------------------------------