You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 23:57:38 UTC
[1/2] incubator-beam git commit: BoundedReader: add
getSplitPoints{Consumed, Remaining}
Repository: incubator-beam
Updated Branches:
refs/heads/master efd1f95b6 -> 4755c5a78
BoundedReader: add getSplitPoints{Consumed,Remaining}
And implement and test it for common sources
OffsetBasedReader: test limited parallelism signals
AvroSource: rewrite to support remaining parallelism
*) Make the start of a block match Avro's definition: the first byte after the previous sync marker.
This enables detecting the last block in the file.
*) This change enables us to unify currentOffset and currentBlockOffset, as all records are emitted
at the start of the block that contains them.
*) Simplify block header reading to have fewer object allocations and buffers using a direct
reader and a (allocated once only) CountingInputStream to measure the size of that header.
*) Add tests for consumed and remaining parallelism
*) Let BlockBasedSource detect the end of the file in remaining parallelism.
*) Verify in more places that the correct number of bytes is read from
the input Avro file.
CompressedSource: add tests of parallelism and progress
*) empty file
*) non-empty compressed file
*) non-empty not-compressed file
TextIO: implement and test parallelism
*) empty file
*) non-empty file
CountingSource: test limited parallelism
CompressedSource: implement currentOffset based on bytes decompressed
*) This is not a very good offset because it is an upper bound, but it is
likely better than not reporting any progress at all.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32a6cde4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32a6cde4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32a6cde4
Branch: refs/heads/master
Commit: 32a6cde4e43726849713a7183c66aa28f43b0868
Parents: efd1f95
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 3 17:53:48 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:42:00 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++-------
.../apache/beam/sdk/io/BlockBasedSource.java | 26 +--
.../org/apache/beam/sdk/io/BoundedSource.java | 145 +++++++++++++++-
.../apache/beam/sdk/io/CompressedSource.java | 132 +++++++++++++--
.../org/apache/beam/sdk/io/CountingSource.java | 5 +
.../org/apache/beam/sdk/io/DatastoreIO.java | 13 ++
.../org/apache/beam/sdk/io/FileBasedSource.java | 2 +-
.../apache/beam/sdk/io/OffsetBasedSource.java | 49 +++++-
.../java/org/apache/beam/sdk/io/TextIO.java | 20 ++-
.../beam/sdk/io/range/OffsetRangeTracker.java | 109 ++++++++++--
.../org/apache/beam/sdk/io/AvroSourceTest.java | 86 +++++++++-
.../beam/sdk/io/CompressedSourceTest.java | 107 +++++++++++-
.../apache/beam/sdk/io/CountingSourceTest.java | 30 ++++
.../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../beam/sdk/io/OffsetBasedSourceTest.java | 71 +++++++-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 114 ++++++++++++-
.../sdk/io/range/OffsetRangeTrackerTest.java | 1 -
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 12 ++
18 files changed, 969 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index ef8e427..255199f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkState;
+
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,18 +42,24 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
+import org.apache.commons.compress.utils.CountingInputStream;
import java.io.ByteArrayInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.Arrays;
import java.util.Collection;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
+import javax.annotation.concurrent.GuardedBy;
+
// CHECKSTYLE.OFF: JavadocStyle
/**
* A {@link FileBasedSource} for reading Avro files.
@@ -439,10 +447,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* the total number of records in the block and the block's size in bytes, followed by the
* block's (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.
*
- * <p>Here, we consider the sync marker that precedes a block to be its offset, as this allows
- * a reader that begins reading at that offset to detect the sync marker and the beginning of
- * the block.
- *
* @param <T> The type of records contained in the block.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
@@ -450,24 +454,25 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// The current block.
private AvroBlock<T> currentBlock;
- // Offset of the block.
+ // A lock used to synchronize block offsets for getRemainingParallelism
+ private final Object progressLock = new Object();
+
+ // Offset of the current block.
+ @GuardedBy("progressLock")
private long currentBlockOffset = 0;
// Size of the current block.
+ @GuardedBy("progressLock")
private long currentBlockSizeBytes = 0;
- // Current offset within the stream.
- private long currentOffset = 0;
-
// Stream used to read from the underlying file.
- // A pushback stream is used to restore bytes buffered during seeking/decoding.
+ // A pushback stream is used to restore bytes buffered during seeking.
private PushbackInputStream stream;
+ // Counts the number of bytes read. Used only to tell how many bytes are taken up in
+ // a block's variable-length header.
+ private CountingInputStream countStream;
- // Small buffer for reading encoded values from the stream.
- // The maximum size of an encoded long is 10 bytes, and this buffer will be used to read two.
- private final byte[] readBuffer = new byte[20];
-
- // Decoder to decode binary-encoded values from the buffer.
+ // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer.
private BinaryDecoder decoder;
/**
@@ -482,51 +487,67 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return (AvroSource<T>) super.getCurrentSource();
}
+ // Precondition: the stream is positioned after the sync marker in the current (about to be
+ // previous) block. currentBlockSize equals the size of the current block, or zero if this
+ // reader was just started.
+ //
+ // Postcondition: same as above, but for the new current (formerly next) block.
@Override
public boolean readNextBlock() throws IOException {
- // The next block in the file is after the first sync marker that can be read starting from
- // the current offset. First, we seek past the next sync marker, if it exists. After a sync
- // marker is the start of a block. A block begins with the number of records contained in
- // the block, encoded as a long, followed by the size of the block in bytes, encoded as a
- // long. The currentOffset after this method should be last byte after this block, and the
- // currentBlockOffset should be the start of the sync marker before this block.
-
- // Seek to the next sync marker, if one exists.
- currentOffset += advancePastNextSyncMarker(stream, getCurrentSource().getSyncMarker());
-
- // The offset of the current block includes its preceding sync marker.
- currentBlockOffset = currentOffset - getCurrentSource().getSyncMarker().length;
-
- // Read a small buffer to parse the block header.
- // We cannot use a BinaryDecoder to do this directly from the stream because a BinaryDecoder
- // internally buffers data and we only want to read as many bytes from the stream as the size
- // of the header. Though BinaryDecoder#InputStream returns an input stream that is aware of
- // its internal buffering, we would have to re-wrap this input stream to seek for the next
- // block in the file.
- int read = stream.read(readBuffer);
- // We reached the last sync marker in the file.
- if (read <= 0) {
+ long startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;
+
+ // Before reading the variable-sized block header, record the current number of bytes read.
+ long preHeaderCount = countStream.getBytesRead();
+ decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder);
+ long numRecords;
+ try {
+ numRecords = decoder.readLong();
+ } catch (EOFException e) {
+ // Expected for the last block, at which the start position is the EOF. The way to detect
+ // stream ending is to try reading from it.
return false;
}
- decoder = DecoderFactory.get().binaryDecoder(readBuffer, decoder);
- long numRecords = decoder.readLong();
long blockSize = decoder.readLong();
- // The decoder buffers data internally, but since we know the size of the stream the
- // decoder has constructed from the readBuffer, the number of bytes available in the
- // input stream is equal to the number of unconsumed bytes.
- int headerSize = readBuffer.length - decoder.inputStream().available();
- stream.unread(readBuffer, headerSize, read - headerSize);
+ // Mark header size as the change in the number of bytes read.
+ long headerSize = countStream.getBytesRead() - preHeaderCount;
// Create the current block by reading blockSize bytes. Block sizes permitted by the Avro
// specification are [32, 2^30], so this narrowing is ok.
byte[] data = new byte[(int) blockSize];
- stream.read(data);
+ int read = stream.read(data);
+ checkState(blockSize == read, "Only %s/%s bytes in the block were read", read, blockSize);
currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
- currentBlockSizeBytes = blockSize;
- // Update current offset with the number of bytes we read to get the next block.
- currentOffset += headerSize + blockSize;
+ // Read the end of this block, which MUST be a sync marker for correctness.
+ byte[] syncMarker = getCurrentSource().getSyncMarker();
+ byte[] readSyncMarker = new byte[syncMarker.length];
+ long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
+ long bytesRead = stream.read(readSyncMarker);
+ checkState(
+ bytesRead == syncMarker.length,
+ "When trying to read a sync marker at position %s, only able to read %s/%s bytes",
+ syncMarkerOffset,
+ bytesRead,
+ syncMarker.length);
+ if (!Arrays.equals(syncMarker, readSyncMarker)) {
+ throw new IllegalStateException(
+ String.format(
+ "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s",
+ syncMarkerOffset,
+ syncMarkerOffset + syncMarker.length,
+ getCurrentSource().getFileOrPatternSpec(),
+ Arrays.toString(readSyncMarker)
+ ));
+ }
+
+ // Atomically update both the position and offset of the new block.
+ synchronized (progressLock) {
+ currentBlockOffset = startOfNextBlock;
+ // Total block size includes the header, block content, and trailing sync marker.
+ currentBlockSizeBytes = headerSize + blockSize + syncMarker.length;
+ }
+
return true;
}
@@ -537,32 +558,65 @@ public class AvroSource<T> extends BlockBasedSource<T> {
@Override
public long getCurrentBlockOffset() {
- return currentBlockOffset;
+ synchronized (progressLock) {
+ return currentBlockOffset;
+ }
}
@Override
public long getCurrentBlockSize() {
- return currentBlockSizeBytes;
+ synchronized (progressLock) {
+ return currentBlockSizeBytes;
+ }
+ }
+
+ @Override
+ public long getSplitPointsRemaining() {
+ if (isDone()) {
+ return 0;
+ }
+ synchronized (progressLock) {
+ if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {
+ // This block is known to be the last block in the range.
+ return 1;
+ }
+ }
+ return super.getSplitPointsRemaining();
}
/**
* Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able
- * to push back the syncBuffer and the readBuffer.
+ * to push back the syncBuffer.
*/
private PushbackInputStream createStream(ReadableByteChannel channel) {
return new PushbackInputStream(
Channels.newInputStream(channel),
- getCurrentSource().getSyncMarker().length + readBuffer.length);
+ getCurrentSource().getSyncMarker().length);
}
- /**
- * Starts reading from the provided channel. Assumes that the channel is already seeked to
- * the source's start offset.
- */
+ // Postcondition: the stream is positioned at the beginning of the first block after the start
+ // of the current source, and currentBlockOffset is that position. Additionally,
+ // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.
@Override
protected void startReading(ReadableByteChannel channel) throws IOException {
+ long startOffset = getCurrentSource().getStartOffset();
+ byte[] syncMarker = getCurrentSource().getSyncMarker();
+ long syncMarkerLength = syncMarker.length;
+
+ if (startOffset != 0) {
+ // Rewind order to find the sync marker ending the previous block.
+ long position = Math.max(0, startOffset - syncMarkerLength);
+ ((SeekableByteChannel) channel).position(position);
+ startOffset = position;
+ }
+
+ // Satisfy the post condition.
stream = createStream(channel);
- currentOffset = getCurrentSource().getStartOffset();
+ countStream = new CountingInputStream(stream);
+ synchronized (progressLock) {
+ currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker);
+ currentBlockSizeBytes = 0;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 31ef055..997c77a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -206,28 +206,32 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
}
@Override
+ @Nullable
public Double getFractionConsumed() {
- if (getCurrentSource().getEndOffset() == Long.MAX_VALUE) {
- return null;
- }
- Block<T> currentBlock = getCurrentBlock();
- if (currentBlock == null) {
- // There is no current block (i.e., the read has not yet begun).
+ if (!isStarted()) {
return 0.0;
}
+ if (isDone()) {
+ return 1.0;
+ }
+ FileBasedSource<T> source = getCurrentSource();
+ if (source.getEndOffset() == Long.MAX_VALUE) {
+ // Unknown end offset, so we cannot tell.
+ return null;
+ }
+
long currentBlockOffset = getCurrentBlockOffset();
- long startOffset = getCurrentSource().getStartOffset();
- long endOffset = getCurrentSource().getEndOffset();
+ long startOffset = source.getStartOffset();
+ long endOffset = source.getEndOffset();
double fractionAtBlockStart =
((double) (currentBlockOffset - startOffset)) / (endOffset - startOffset);
double fractionAtBlockEnd =
((double) (currentBlockOffset + getCurrentBlockSize() - startOffset)
/ (endOffset - startOffset));
+ double blockFraction = getCurrentBlock().getFractionOfBlockConsumed();
return Math.min(
1.0,
- fractionAtBlockStart
- + currentBlock.getFractionOfBlockConsumed()
- * (fractionAtBlockEnd - fractionAtBlockStart));
+ fractionAtBlockStart + blockFraction * (fractionAtBlockEnd - fractionAtBlockStart));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
index 8f7d3fd..394afa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.range.OffsetRangeTracker;
+import org.apache.beam.sdk.io.range.RangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -27,6 +29,8 @@ import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
/**
* A {@link Source} that reads a finite amount of input and, because of that, supports
* some additional operations.
@@ -37,9 +41,16 @@ import java.util.NoSuchElementException;
* <li>Size estimation: {@link #getEstimatedSizeBytes};
* <li>Telling whether or not this source produces key/value pairs in sorted order:
* {@link #producesSortedKeys};
- * <li>The reader ({@link BoundedReader}) supports progress estimation
- * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting
- * ({@link BoundedReader#splitAtFraction}).
+ * <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners
+ * to dynamically adapt based on runtime conditions.
+ * <ul>
+ * <li>Progress estimation ({@link BoundedReader#getFractionConsumed})
+ * <li>Tracking of parallelism, to determine whether the current source can be split
+ * ({@link BoundedReader#getSplitPointsConsumed()} and
+ * {@link BoundedReader#getSplitPointsRemaining()}).
+ * <li>Dynamic splitting of the current source ({@link BoundedReader#splitAtFraction}).
+ * </ul>
+ * </li>
* </ul>
*
* <p>To use this class for supporting your custom input type, derive your class
@@ -82,14 +93,14 @@ public abstract class BoundedSource<T> extends Source<T> {
*
* <h3>Thread safety</h3>
* All methods will be run from the same thread except {@link #splitAtFraction},
- * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently
+ * {@link #getFractionConsumed}, {@link #getCurrentSource}, {@link #getSplitPointsConsumed()},
+ * and {@link #getSplitPointsRemaining()}, all of which can be called concurrently
* from a different thread. There will not be multiple concurrent calls to
- * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if
- * {@link #splitAtFraction} is implemented.
+ * {@link #splitAtFraction}.
*
- * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about
- * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and
- * {@link #getFractionConsumed} concurrently with other methods.
+ * <p>It must be safe to call {@link #splitAtFraction}, {@link #getFractionConsumed},
+ * {@link #getCurrentSource}, {@link #getSplitPointsConsumed()}, and
+ * {@link #getSplitPointsRemaining()} concurrently with other methods.
*
* <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause
* {@link #getCurrentSource} to start returning a different value.
@@ -129,11 +140,126 @@ public abstract class BoundedSource<T> extends Source<T> {
* methods (including itself), and it is therefore critical for it to be implemented
* in a thread-safe way.
*/
+ @Nullable
public Double getFractionConsumed() {
return null;
}
/**
+ * A constant to use as the return value for {@link #getSplitPointsConsumed()} or
+ * {@link #getSplitPointsRemaining()} when the exact value is unknown.
+ */
+ public static final long SPLIT_POINTS_UNKNOWN = -1;
+
+ /**
+ * Returns the total amount of parallelism in the consumed (returned and processed) range of
+ * this reader's current {@link BoundedSource} (as would be returned by
+ * {@link #getCurrentSource}). This corresponds to all split point records (see
+ * {@link RangeTracker}) returned by this reader, <em>excluding</em> the last split point
+ * returned if the reader is not finished.
+ *
+ * <p>Consider the following examples: (1) An input that can be read in parallel down to the
+ * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable".
+ * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has
+ * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
+ * input such as a cursor in a database.
+ *
+ * <ul>
+ * <li>Any {@link BoundedReader reader} that is unstarted (aka, has never had a call to
+ * {@link #start}) has a consumed parallelism of 0. This condition holds independent of whether
+ * the input is splittable.
+ * <li>Any {@link BoundedReader reader} that has only returned its first element (aka,
+ * has never had a call to {@link #advance}) has a consumed parallelism of 0: the first element
+ * is the current element and is still being processed. This condition holds independent of
+ * whether the input is splittable.
+ * <li>For an empty reader (in which the call to {@link #start} returned false), the
+ * consumed parallelism is 0. This condition holds independent of whether the input is
+ * splittable.
+ * <li>For a non-empty, finished reader (in which the call to {@link #start} returned true and
+ * a call to {@link #advance} has returned false), the value returned must be at least 1
+ * and should equal the total parallelism in the source.
+ * <li>For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly
+ * splittable 50-record input, this value should be 29. When finished, the consumed parallelism
+ * should be 50.
+ * <li>For example (2): In a block-compressed value consisting of 5 blocks, the value should
+ * stay at 0 until the first record of the second block is returned; stay at 1 until the first
+ * record of the third block is returned, etc. Only once the end-of-file is reached then the
+ * fifth block has been consumed and the value should stay at 5.
+ * <li>For example (3): For any non-empty unsplittable input, the consumed parallelism is 0
+ * until the reader is finished (because the last call to {@link #advance} returned false, at
+ * which point it becomes 1.
+ * </ul>
+ *
+ * <p>A reader that is implemented using a {@link RangeTracker} is encouraged to use the
+ * range tracker's ability to count split points to implement this method. See
+ * {@link OffsetBasedSource.OffsetBasedReader} and {@link OffsetRangeTracker} for an example.
+ *
+ * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted
+ * as unknown.
+ *
+ * <h3>Thread safety</h3>
+ * See the javadoc on {@link BoundedReader} for information about thread safety.
+ *
+ * @see #getSplitPointsRemaining()
+ */
+ public long getSplitPointsConsumed() {
+ return SPLIT_POINTS_UNKNOWN;
+ }
+
+ /**
+ * Returns the total amount of parallelism in the unprocessed part of this reader's current
+ * {@link BoundedSource} (as would be returned by {@link #getCurrentSource}). This corresponds
+ * to all unprocessed split point records (see {@link RangeTracker}), including the last
+ * split point returned, in the remainder part of the source.
+ *
+ * <p>This function should be implemented only <strong>in addition to
+ * {@link #getSplitPointsConsumed()}</strong> and only if <em>an exact value can be
+ * returned</em>.
+ *
+ * <p>Consider the following examples: (1) An input that can be read in parallel down to the
+ * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable".
+ * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has
+ * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable"
+ * input such as a cursor in a database.
+ *
+ * <p>Assume for examples (1) and (2) that the number of records or blocks remaining is known:
+ *
+ * <ul>
+ * <li>Any {@link BoundedReader reader} for which the last call to {@link #start} or
+ * {@link #advance} has returned true should should not return 0, because this reader itself
+ * represents parallelism at least 1. This condition holds independent of whether the input is
+ * splittable.
+ * <li>A finished reader (for which {@link #start} or {@link #advance}) has returned false
+ * should return a value of 0. This condition holds independent of whether the input is
+ * splittable.
+ * <li>For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly
+ * splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total
+ * number of records is known.
+ * <li>For example 2: After returning a record in block 3 in a block-compressed file
+ * consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in
+ * parallel by new readers produced via dynamic work rebalancing, while the current reader
+ * continues processing block 3) if the total number of blocks is known.
+ * <li>For example (3): a reader for any non-empty unsplittable input, should return 1 until
+ * it is finished, at which point it should return 0.
+ * <li>For any reader: After returning the last split point in a file (e.g., the last record
+ * in example (1), the first record in the last block for example (2), or the first record in
+ * the file for example (3), this value should be 1: apart from the current task, no additional
+ * remainder can be split off.
+ * </ul>
+ *
+ * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted as
+ * unknown.
+ *
+ * <h3>Thread safety</h3>
+ * See the javadoc on {@link BoundedReader} for information about thread safety.
+ *
+ * @see #getSplitPointsConsumed()
+ */
+ public long getSplitPointsRemaining() {
+ return SPLIT_POINTS_UNKNOWN;
+ }
+
+ /**
* Returns a {@code Source} describing the same input that this {@code Reader} currently reads
* (including items already read).
*
@@ -263,6 +389,7 @@ public abstract class BoundedSource<T> extends Source<T> {
*
* <p>By default, returns null to indicate that splitting is not possible.
*/
+ @Nullable
public BoundedSource<T> splitAtFraction(double fraction) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 5cb0684..8bccf5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -32,11 +32,14 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import java.util.zip.GZIPInputStream;
+import javax.annotation.concurrent.GuardedBy;
+
/**
* A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate
* {@link FileBasedSource} that is able to read the decompressed file format.
@@ -361,7 +364,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
private final FileBasedReader<T> readerDelegate;
private final CompressedSource<T> source;
+ private final boolean splittable;
+ private final Object progressLock = new Object();
+ @GuardedBy("progressLock")
private int numRecordsRead;
+ @GuardedBy("progressLock")
+ private CountingChannel channel;
/**
* Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader.
@@ -369,6 +377,13 @@ public class CompressedSource<T> extends FileBasedSource<T> {
public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) {
super(source);
this.source = source;
+ boolean splittable;
+ try {
+ splittable = source.isSplittable();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to tell whether source " + source + " is splittable", e);
+ }
+ this.splittable = splittable;
this.readerDelegate = readerDelegate;
}
@@ -380,18 +395,78 @@ public class CompressedSource<T> extends FileBasedSource<T> {
return readerDelegate.getCurrent();
}
+ @Override
+ public final long getSplitPointsConsumed() {
+ if (splittable) {
+ return readerDelegate.getSplitPointsConsumed();
+ } else {
+ synchronized (progressLock) {
+ return (isDone() && numRecordsRead > 0) ? 1 : 0;
+ }
+ }
+ }
+
+ @Override
+ public final long getSplitPointsRemaining() {
+ if (splittable) {
+ return readerDelegate.getSplitPointsRemaining();
+ } else {
+ return isDone() ? 0 : 1;
+ }
+ }
+
/**
* Returns true only for the first record; compressed sources cannot be split.
*/
@Override
protected final boolean isAtSplitPoint() {
- // We have to return true for the first record, but not for the state before reading it,
- // and not for the state after reading any other record. Hence == rather than >= or <=.
- // This is required because FileBasedReader is intended for readers that can read a range
- // of offsets in a file and where the range can be split in parts. CompressedReader,
- // however, is a degenerate case because it cannot be split, but it has to satisfy the
- // semantics of offsets and split points anyway.
- return numRecordsRead == 1;
+ if (splittable) {
+ return readerDelegate.isAtSplitPoint();
+ } else {
+ // We have to return true for the first record, but not for the state before reading it,
+ // and not for the state after reading any other record. Hence == rather than >= or <=.
+ // This is required because FileBasedReader is intended for readers that can read a range
+ // of offsets in a file and where the range can be split in parts. CompressedReader,
+ // however, is a degenerate case because it cannot be split, but it has to satisfy the
+ // semantics of offsets and split points anyway.
+ synchronized (progressLock) {
+ return numRecordsRead == 1;
+ }
+ }
+ }
+
+ private static class CountingChannel implements ReadableByteChannel {
+ long count;
+ private final ReadableByteChannel inner;
+
+ public CountingChannel(ReadableByteChannel inner, long count) {
+ this.inner = inner;
+ this.count = count;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int bytes = inner.read(dst);
+ if (bytes > 0) {
+ // Avoid the -1 from EOF.
+ count += bytes;
+ }
+ return bytes;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inner.close();
+ }
}
/**
@@ -400,6 +475,16 @@ public class CompressedSource<T> extends FileBasedSource<T> {
*/
@Override
protected final void startReading(ReadableByteChannel channel) throws IOException {
+ if (splittable) {
+ // No-op. We will always delegate to the inner reader, so this.channel and this.progressLock
+ // will never be used.
+ } else {
+ synchronized (progressLock) {
+ this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset());
+ channel = this.channel;
+ }
+ }
+
if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) {
FileNameBasedDecompressingChannelFactory channelFactory =
(FileNameBasedDecompressingChannelFactory) source.getChannelFactory();
@@ -420,16 +505,37 @@ public class CompressedSource<T> extends FileBasedSource<T> {
if (!readerDelegate.readNextRecord()) {
return false;
}
- ++numRecordsRead;
+ synchronized (progressLock) {
+ ++numRecordsRead;
+ }
return true;
}
- /**
- * Returns the delegate reader's current offset in the decompressed input.
- */
+ // Splittable: simply delegates to the inner reader.
+ //
+ // Unsplittable: returns the offset in the input stream that has been read by the input.
+ // these positions are likely to be coarse-grained (in the event of buffering) and
+ // over-estimates (because they reflect the number of bytes read to produce an element, not its
+ // start) but both of these provide better data than e.g., reporting the start of the file.
@Override
- protected final long getCurrentOffset() {
- return readerDelegate.getCurrentOffset();
+ protected final long getCurrentOffset() throws NoSuchElementException {
+ if (splittable) {
+ return readerDelegate.getCurrentOffset();
+ } else {
+ synchronized (progressLock) {
+ if (numRecordsRead <= 1) {
+ // Since the first record is at a split point, it should start at the beginning of the
+ // file. This avoids the bad case where the decompressor read the entire file, which
+ // would cause the file to be treated as empty when returning channel.getCount() as it
+ // is outside the valid range.
+ return 0;
+ }
+ if (channel == null) {
+ throw new NoSuchElementException();
+ }
+ return channel.getCount();
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index b28e866..403d22e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -210,6 +210,11 @@ public class CountingSource {
}
@Override
+ public synchronized long getSplitPointsRemaining() {
+ return Math.max(0, getCurrentSource().getEndOffset() - current);
+ }
+
+ @Override
public synchronized BoundedCountingSource getCurrentSource() {
return (BoundedCountingSource) super.getCurrentSource();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
index cc8e923..137c6cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
@@ -865,6 +865,8 @@ public class DatastoreIO {
*/
private int userLimit;
+ private volatile boolean done = false;
+
private Entity currentEntity;
/**
@@ -885,6 +887,16 @@ public class DatastoreIO {
}
@Override
+ public final long getSplitPointsConsumed() {
+ return done ? 1 : 0;
+ }
+
+ @Override
+ public final long getSplitPointsRemaining() {
+ return done ? 0 : 1;
+ }
+
+ @Override
public boolean start() throws IOException {
return advance();
}
@@ -901,6 +913,7 @@ public class DatastoreIO {
if (entities == null || !entities.hasNext()) {
currentEntity = null;
+ done = true;
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 96aeda5..f000f6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -489,7 +489,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
@Override
- public FileBasedSource<T> getCurrentSource() {
+ public synchronized FileBasedSource<T> getCurrentSource() {
return (FileBasedSource<T>) super.getCurrentSource();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 9ee89a2..2f62acd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -180,7 +180,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
*
* <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
* that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
- * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine
+ * indicate that a file should be read to the end. Then this function should determine
* the actual, exact size of the file in bytes and return it.
*/
public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
@@ -230,9 +230,22 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
*/
public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
-
private OffsetBasedSource<T> source;
+ /**
+ * Returns true if the last call to {@link #start} or {@link #advance} returned false.
+ */
+ public final boolean isDone() {
+ return rangeTracker.isDone();
+ }
+
+ /**
+ * Returns true if there has been a call to {@link #start}.
+ */
+ public final boolean isStarted() {
+ return rangeTracker.isStarted();
+ }
+
/** The {@link OffsetRangeTracker} managing the range and current position of the source. */
private final OffsetRangeTracker rangeTracker;
@@ -266,12 +279,14 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
@Override
public final boolean start() throws IOException {
- return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
+ return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())
+ || rangeTracker.markDone();
}
@Override
public final boolean advance() throws IOException {
- return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
+ return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset())
+ || rangeTracker.markDone();
}
/**
@@ -315,6 +330,32 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
}
@Override
+ public long getSplitPointsConsumed() {
+ return rangeTracker.getSplitPointsProcessed();
+ }
+
+ @Override
+ public long getSplitPointsRemaining() {
+ if (isDone()) {
+ return 0;
+ } else if (!isStarted()) {
+ // Note that even if the current source does not allow splitting, we don't know that
+ // it's non-empty so we return UNKNOWN instead of 1.
+ return BoundedReader.SPLIT_POINTS_UNKNOWN;
+ } else if (!getCurrentSource().allowsDynamicSplitting()) {
+ // Started (so non-empty) and unsplittable, so only the current task.
+ return 1;
+ } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) {
+ // If this is true, the next element is outside the range. Note that even getCurrentOffset()
+ // might be larger than the stop position when the current record is not a split point.
+ return 1;
+ } else {
+ // Use the default.
+ return super.getSplitPointsRemaining();
+ }
+ }
+
+ @Override
public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
if (!getCurrentSource().allowsDynamicSplitting()) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 79eeb08..13cb45e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -817,9 +817,10 @@ public class TextIO {
private ByteString buffer;
private int startOfSeparatorInBuffer;
private int endOfSeparatorInBuffer;
- private long startOfNextRecord;
- private boolean eof;
- private boolean elementIsPresent;
+ private long startOfRecord;
+ private volatile long startOfNextRecord;
+ private volatile boolean eof;
+ private volatile boolean elementIsPresent;
private T currentValue;
private ReadableByteChannel inChannel;
@@ -834,7 +835,15 @@ public class TextIO {
if (!elementIsPresent) {
throw new NoSuchElementException();
}
- return startOfNextRecord;
+ return startOfRecord;
+ }
+
+ @Override
+ public long getSplitPointsRemaining() {
+ if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+ return isDone() ? 0 : 1;
+ }
+ return super.getSplitPointsRemaining();
}
@Override
@@ -912,7 +921,7 @@ public class TextIO {
@Override
protected boolean readNextRecord() throws IOException {
- startOfNextRecord += endOfSeparatorInBuffer;
+ startOfRecord = startOfNextRecord;
findSeparatorBounds();
// If we have reached EOF file and consumed all of the buffer then we know
@@ -923,6 +932,7 @@ public class TextIO {
}
decodeCurrentElement();
+ startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index ea1cf14..76790af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.io.range;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -32,6 +36,8 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
private long stopOffset;
private long lastRecordStart = -1L;
private long offsetOfLastSplitPoint = -1L;
+ private long splitPointsSeen = 0L;
+ private boolean done = false;
/**
* Offset corresponding to infinity. This can only be used as the upper-bound of a range, and
@@ -49,6 +55,15 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
this.stopOffset = stopOffset;
}
+ public synchronized boolean isStarted() {
+ // done => started: handles the case when the reader was empty.
+ return (offsetOfLastSplitPoint != -1) || done;
+ }
+
+ public synchronized boolean isDone() {
+ return done;
+ }
+
@Override
public synchronized Long getStartPosition() {
return startOffset;
@@ -65,10 +80,18 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
}
public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {
- if (lastRecordStart == -1 && !isAtSplitPoint) {
+ if (!isStarted() && !isAtSplitPoint) {
throw new IllegalStateException(
String.format("The first record [starting at %d] must be at a split point", recordStart));
}
+ if (recordStart < startOffset) {
+ throw new IllegalStateException(
+ String.format(
+ "Trying to return record [starting at %d] which is before the start offset [%d]",
+ recordStart,
+ startOffset));
+
+ }
if (recordStart < lastRecordStart) {
throw new IllegalStateException(
String.format(
@@ -77,8 +100,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
recordStart,
lastRecordStart));
}
+
+ lastRecordStart = recordStart;
+
if (isAtSplitPoint) {
- if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) {
+ if (recordStart == offsetOfLastSplitPoint) {
throw new IllegalStateException(
String.format(
"Record at a split point has same offset as the previous split point: "
@@ -86,12 +112,13 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
offsetOfLastSplitPoint, recordStart));
}
if (recordStart >= stopOffset) {
+ done = true;
return false;
}
offsetOfLastSplitPoint = recordStart;
+ ++splitPointsSeen;
}
- lastRecordStart = recordStart;
return true;
}
@@ -105,7 +132,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);
return false;
}
- if (lastRecordStart == -1) {
+ if (!isStarted()) {
LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);
return false;
}
@@ -143,17 +170,72 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
@Override
public synchronized double getFractionConsumed() {
- if (stopOffset == OFFSET_INFINITY) {
+ if (!isStarted()) {
return 0.0;
- }
- if (lastRecordStart == -1) {
+ } else if (isDone()) {
+ return 1.0;
+ } else if (stopOffset == OFFSET_INFINITY) {
return 0.0;
+ } else if (lastRecordStart >= stopOffset) {
+ return 1.0;
+ } else {
+ // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
+ // which is (4 - 3 + 1) / (6 - 3) = 67%.
+ // Also, clamp to at most 1.0 because the last consumed position can extend past the
+ // stop position.
+ return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
}
- // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
- // which is (4 - 3 + 1) / (6 - 3) = 67%.
- // Also, clamp to at most 1.0 because the last consumed position can extend past the
- // stop position.
- return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
+ }
+
+ /**
+ * Returns the total number of split points that have been processed.
+ *
+ * <p>A split point at a particular offset has been seen if there has been a corresponding call
+ * to {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true. It has been
+ * processed if there has been a <em>subsequent</em> call to
+ * {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true and at a larger
+ * offset.
+ *
+ * <p>Note that for correctness when implementing {@link BoundedReader#getSplitPointsConsumed()},
+ * if a reader finishes before {@link #tryReturnRecordAt(boolean, long)} returns false,
+ * the reader should add an additional call to {@link #markDone()}. This will indicate that
+ * processing for the last seen split point has been finished.
+ *
+ * @see org.apache.beam.sdk.io.OffsetBasedSource for a {@link BoundedReader}
+ * implemented using {@link OffsetRangeTracker}.
+ */
+ public synchronized long getSplitPointsProcessed() {
+ if (!isStarted()) {
+ return 0;
+ } else if (isDone()) {
+ return splitPointsSeen;
+ } else {
+ // There is a current split point, and it has not finished processing.
+ checkState(
+ splitPointsSeen > 0,
+ "A started rangeTracker should have seen > 0 split points (is %s)",
+ splitPointsSeen);
+ return splitPointsSeen - 1;
+ }
+ }
+
+
+ /**
+ * Marks this range tracker as being done. Specifically, this will mark the current split point,
+ * if one exists, as being finished.
+ *
+ * <p>Always returns false, so that it can be used in an implementation of
+ * {@link BoundedReader#start()} or {@link BoundedReader#advance()} as follows:
+ *
+ * <pre> {@code
+ * public boolean start() {
+ * return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)
+ * || rangeTracker.markDone();
+ * }} </pre>
+ */
+ public synchronized boolean markDone() {
+ done = true;
+ return false;
}
@Override
@@ -177,7 +259,10 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
@VisibleForTesting
OffsetRangeTracker copy() {
OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset);
+ res.offsetOfLastSplitPoint = this.offsetOfLastSplitPoint;
res.lastRecordStart = this.lastRecordStart;
+ res.done = this.done;
+ res.splitPointsSeen = this.splitPointsSeen;
return res;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 20c21bc..13f8e7f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
+import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -44,6 +46,7 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
+import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -57,6 +60,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
@@ -198,6 +202,86 @@ public class AvroSourceTest {
}
@Test
+ public void testProgress() throws Exception {
+ // 5 records, 2 per block.
+ List<FixedRecord> records = createFixedRecords(5);
+ String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2,
+ AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC);
+
+ AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
+ try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
+ assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
+ BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
+
+ // Before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // First 2 records are in the same block.
+ assertTrue(reader.start());
+ assertTrue(reader.isAtSplitPoint());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+ // continued
+ assertTrue(reader.advance());
+ assertFalse(reader.isAtSplitPoint());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Second block -> parallelism consumed becomes 1.
+ assertTrue(reader.advance());
+ assertTrue(reader.isAtSplitPoint());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+ // continued
+ assertTrue(reader.advance());
+ assertFalse(reader.isAtSplitPoint());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Third and final block -> parallelism consumed becomes 2, remaining becomes 1.
+ assertTrue(reader.advance());
+ assertTrue(reader.isAtSplitPoint());
+ assertEquals(2, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // Done
+ assertFalse(reader.advance());
+ assertEquals(3, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ }
+ }
+
+ @Test
+ public void testProgressEmptySource() throws Exception {
+ // 0 records, 20 per block.
+ List<FixedRecord> records = Collections.emptyList();
+ String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2,
+ AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC);
+
+ AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
+ try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
+ assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
+ BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
+
+ // before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // confirm empty
+ assertFalse(reader.start());
+
+ // after reading empty source
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ }
+ }
+
+ @Test
public void testGetCurrentFromUnstartedReader() throws Exception {
List<FixedRecord> records = createFixedRecords(DEFAULT_RECORD_COUNT);
String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_DEFAULT, 1000,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 542e734..7161c1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -19,9 +19,10 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -29,8 +30,11 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.CompressedSource.CompressedReader;
import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory;
+import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -462,11 +466,12 @@ public class CompressedSourceTest {
private static class ByteReader extends FileBasedReader<Byte> {
ByteBuffer buff = ByteBuffer.allocate(1);
Byte current;
- long offset = -1;
+ long offset;
ReadableByteChannel channel;
public ByteReader(ByteSource source) {
super(source);
+ offset = source.getStartOffset() - 1;
}
@Override
@@ -501,4 +506,102 @@ public class CompressedSourceTest {
}
}
}
+
+ @Test
+ public void testEmptyGzipProgress() throws IOException {
+ File tmpFile = tmpFolder.newFile("empty.gz");
+ String filename = tmpFile.toPath().toString();
+ writeFile(tmpFile, new byte[0], CompressionMode.GZIP);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+ try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+ assertThat(readerOrig, instanceOf(CompressedReader.class));
+ CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig;
+ // before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // confirm empty
+ assertFalse(reader.start());
+
+ // after reading empty source
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
+ public void testGzipProgress() throws IOException {
+ int numRecords = 3;
+ File tmpFile = tmpFolder.newFile("nonempty.gz");
+ String filename = tmpFile.toPath().toString();
+ writeFile(tmpFile, new byte[numRecords], CompressionMode.GZIP);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+ try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+ assertThat(readerOrig, instanceOf(CompressedReader.class));
+ CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig;
+ // before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // confirm has three records
+ for (int i = 0; i < numRecords; ++i) {
+ if (i == 0) {
+ assertTrue(reader.start());
+ } else {
+ assertTrue(reader.advance());
+ }
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+ }
+ assertFalse(reader.advance());
+
+ // after reading empty source
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
+ public void testSplittableProgress() throws IOException {
+ File tmpFile = tmpFolder.newFile("nonempty.txt");
+ String filename = tmpFile.toPath().toString();
+ Files.write(new byte[2], tmpFile);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1));
+ try (BoundedReader<Byte> readerOrig = source.createReader(options)) {
+ assertThat(readerOrig, not(instanceOf(CompressedReader.class)));
+ assertThat(readerOrig, instanceOf(FileBasedReader.class));
+ FileBasedReader<Byte> reader = (FileBasedReader<Byte>) readerOrig;
+
+ // Check preconditions before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // First record: none consumed, unknown remaining.
+ assertTrue(reader.start());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Second record: 1 consumed, know that we're on the last record.
+ assertTrue(reader.advance());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // Confirm empty and check post-conditions
+ assertFalse(reader.advance());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(2, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index a261fb2..bf68d41 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.CountingSource.CounterMark;
import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -49,6 +51,7 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.IOException;
import java.util.List;
/**
@@ -116,6 +119,33 @@ public class CountingSourceTest {
}
@Test
+ public void testProgress() throws IOException {
+ final int numRecords = 5;
+ @SuppressWarnings("deprecation") // testing CountingSource
+ BoundedSource<Long> source = CountingSource.upTo(numRecords);
+ try (BoundedReader<Long> reader = source.createReader(PipelineOptionsFactory.create())) {
+ // Check preconditions before starting. Note that CountingReader can always give an accurate
+ // remaining parallelism.
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(numRecords, reader.getSplitPointsRemaining());
+
+ assertTrue(reader.start());
+ int i = 0;
+ do {
+ assertEquals(i, reader.getSplitPointsConsumed());
+ assertEquals(numRecords - i, reader.getSplitPointsRemaining());
+ ++i;
+ } while (reader.advance());
+
+ assertEquals(numRecords, i); // exactly numRecords calls to advance()
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(numRecords, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
@Category(RunnableOnService.class)
public void testUnboundedSource() {
Pipeline p = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index bedbc99..1f16d39 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -446,7 +446,7 @@ public class FileBasedSourceTest {
assertTrue(fractionConsumed > lastFractionConsumed);
lastFractionConsumed = fractionConsumed;
}
- assertTrue(reader.getFractionConsumed() < 1.0);
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index e9b61aa..66abd33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -28,6 +27,8 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -86,13 +87,12 @@ public class OffsetBasedSourceTest {
}
@Override
- public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+ public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException {
return new CoarseRangeReader(this);
}
}
- private static class CoarseRangeReader
- extends OffsetBasedSource.OffsetBasedReader<Integer> {
+ private static class CoarseRangeReader extends OffsetBasedReader<Integer> {
private long current = -1;
private long granularity;
@@ -239,6 +239,69 @@ public class OffsetBasedSourceTest {
}
@Test
+ public void testProgress() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 2);
+ try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
+ // Unstarted reader
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Start and produce the element 14 since granularity is 2.
+ assertTrue(reader.start());
+ assertTrue(reader.isAtSplitPoint());
+ assertEquals(14, reader.getCurrent().intValue());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+ // Advance and produce the element 15, not a split point.
+ assertTrue(reader.advance());
+ assertEquals(15, reader.getCurrent().intValue());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Advance and produce the element 16, is a split point. Since the next offset (17) is
+ // outside the range [13, 17), remaining parallelism should become 1 from UNKNOWN.
+ assertTrue(reader.advance());
+ assertTrue(reader.isAtSplitPoint());
+ assertEquals(16, reader.getCurrent().intValue());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining()); // The next offset is outside the range.
+ // Advance and produce the element 17, not a split point.
+ assertTrue(reader.advance());
+ assertEquals(17, reader.getCurrent().intValue());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // Advance and reach the end of the reader.
+ assertFalse(reader.advance());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(2, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
+ public void testProgressEmptySource() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100);
+ try (OffsetBasedReader<Integer> reader = source.createReader(options)) {
+ // before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // confirm empty
+ assertFalse(reader.start());
+
+ // after reading empty source
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
public void testSplitAtFraction() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 4d6d8dd..53a2a89 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,10 +22,10 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.TextIO.CompressionType;
import org.apache.beam.sdk.io.TextIO.TextSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -423,6 +424,117 @@ public class TextIOTest {
}
@Test
+ public void testProgressEmptyFile() throws IOException {
+ try (BoundedReader<String> reader =
+ prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) {
+ // Check preconditions before starting.
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Assert empty
+ assertFalse(reader.start());
+
+ // Check postconditions after finishing
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
+ public void testProgressTextFile() throws IOException {
+ String file = "line1\nline2\nline3";
+ try (BoundedReader<String> reader =
+ prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) {
+ // Check preconditions before starting
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Line 1
+ assertTrue(reader.start());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Line 2
+ assertTrue(reader.advance());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Line 3
+ assertTrue(reader.advance());
+ assertEquals(2, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // Check postconditions after finishing
+ assertFalse(reader.advance());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(3, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
+ public void testProgressAfterSplitting() throws IOException {
+ String file = "line1\nline2\nline3";
+ BoundedSource source = prepareSource(file.getBytes());
+ BoundedSource remainder;
+
+ // Create the remainder, verifying properties pre- and post-splitting.
+ try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) {
+ // Preconditions.
+ assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6);
+ assertEquals(0, readerOrig.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining());
+
+ // First record, before splitting.
+ assertTrue(readerOrig.start());
+ assertEquals(0, readerOrig.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining());
+
+ // Split. 0.1 is in line1, so should now be able to detect last record.
+ remainder = readerOrig.splitAtFraction(0.1);
+ System.err.println(readerOrig.getCurrentSource());
+ assertNotNull(remainder);
+
+ // First record, after splitting.
+ assertEquals(0, readerOrig.getSplitPointsConsumed());
+ assertEquals(1, readerOrig.getSplitPointsRemaining());
+
+ // Finish and postconditions.
+ assertFalse(readerOrig.advance());
+ assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6);
+ assertEquals(1, readerOrig.getSplitPointsConsumed());
+ assertEquals(0, readerOrig.getSplitPointsRemaining());
+ }
+
+ // Check the properties of the remainder.
+ try (BoundedReader<String> reader = remainder.createReader(PipelineOptionsFactory.create())) {
+ // Preconditions.
+ assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // First record should be line 2.
+ assertTrue(reader.start());
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+
+ // Second record is line 3
+ assertTrue(reader.advance());
+ assertEquals(1, reader.getSplitPointsConsumed());
+ assertEquals(1, reader.getSplitPointsRemaining());
+
+ // Check postconditions after finishing
+ assertFalse(reader.advance());
+ assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
+ assertEquals(2, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ }
+ }
+
+ @Test
public void testReadEmptyLines() throws Exception {
runTestReadWithData("\n\n\n".getBytes(StandardCharsets.UTF_8),
ImmutableList.of("", "", ""));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
index 3de04f7..edd4c4f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
@@ -104,7 +104,6 @@ public class OffsetRangeTrackerTest {
assertFalse(tracker.tryReturnRecordAt(true, 150));
assertFalse(tracker.tryReturnRecordAt(true, 151));
// Should accept non-splitpoint records starting after stop offset.
- assertTrue(tracker.tryReturnRecordAt(false, 135));
assertTrue(tracker.tryReturnRecordAt(false, 152));
assertTrue(tracker.tryReturnRecordAt(false, 160));
assertFalse(tracker.tryReturnRecordAt(true, 171));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index ab537eb..41a271c 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -282,6 +282,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
private Configuration conf;
private RecordReader<K, V> currentReader;
private KV<K, V> currentPair;
+ private volatile boolean done = false;
/**
* Create a {@code HDFSFileReader} based on a file or a file pattern specification.
@@ -356,6 +357,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
}
// either no next split or all readers were empty
currentPair = null;
+ done = true;
return false;
}
} catch (InterruptedException e) {
@@ -433,6 +435,16 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
}
@Override
+ public final long getSplitPointsRemaining() {
+ if (done) {
+ return 0;
+ }
+ // This source does not currently support dynamic work rebalancing, so remaining
+ // parallelism is always 1.
+ return 1;
+ }
+
+ @Override
public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
// Not yet supported. To implement this, the sizes of the splits should be used to
// calculate the remaining splits that constitute the given fraction, then a
[2/2] incubator-beam git commit: This closes #353
Posted by dh...@apache.org.
This closes #353
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4755c5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4755c5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4755c5a7
Branch: refs/heads/master
Commit: 4755c5a784d628bdbfc67e4870d231252381ae6e
Parents: efd1f95 32a6cde
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 20 16:55:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:55:57 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++-------
.../apache/beam/sdk/io/BlockBasedSource.java | 26 +--
.../org/apache/beam/sdk/io/BoundedSource.java | 145 +++++++++++++++-
.../apache/beam/sdk/io/CompressedSource.java | 132 +++++++++++++--
.../org/apache/beam/sdk/io/CountingSource.java | 5 +
.../org/apache/beam/sdk/io/DatastoreIO.java | 13 ++
.../org/apache/beam/sdk/io/FileBasedSource.java | 2 +-
.../apache/beam/sdk/io/OffsetBasedSource.java | 49 +++++-
.../java/org/apache/beam/sdk/io/TextIO.java | 20 ++-
.../beam/sdk/io/range/OffsetRangeTracker.java | 109 ++++++++++--
.../org/apache/beam/sdk/io/AvroSourceTest.java | 86 +++++++++-
.../beam/sdk/io/CompressedSourceTest.java | 107 +++++++++++-
.../apache/beam/sdk/io/CountingSourceTest.java | 30 ++++
.../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +-
.../beam/sdk/io/OffsetBasedSourceTest.java | 71 +++++++-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 114 ++++++++++++-
.../sdk/io/range/OffsetRangeTrackerTest.java | 1 -
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 12 ++
18 files changed, 969 insertions(+), 121 deletions(-)
----------------------------------------------------------------------