You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/05/01 11:07:58 UTC
[1/2] beam git commit: [BEAM-2072] Fixed MicrobatchSource.reader
stops reading after reaching maxNumRecords for the first time.
Repository: beam
Updated Branches:
refs/heads/master b414f8de9 -> 254470e62
[BEAM-2072] Fixed MicrobatchSource.reader stops reading after reaching maxNumRecords for the first time.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b6f4f65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b6f4f65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b6f4f65
Branch: refs/heads/master
Commit: 3b6f4f65ea451f910769311f26e98e94f1ffe871
Parents: b414f8d
Author: Stas Levin <st...@apache.org>
Authored: Wed Apr 26 09:28:08 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Mon May 1 14:07:07 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 188 +++++++++----------
.../beam/runners/spark/io/SourceDStream.java | 12 +-
.../apache/beam/runners/spark/io/SourceRDD.java | 11 +-
.../spark/stateful/StateSpecFunctions.java | 30 +--
4 files changed, 125 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index fde5f9a..53d1ba7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -44,19 +44,14 @@ import org.slf4j.LoggerFactory;
/**
- * Mostly based on {@link org.apache.beam.sdk.io.BoundedReadFromUnboundedSource},
- * with some adjustments for this specific use-case.
- *
- * <p>A {@link BoundedSource} wrapping an {@link UnboundedSource} to complement Spark's micro-batch
- * nature.
- *
- * <p>By design, Spark's micro-batches are bounded by their duration. Spark also provides a
- * back-pressure mechanism that may signal a bound by max records.
+ * A {@link Source} that accommodates Spark's micro-batch oriented nature and wraps an
+ * {@link UnboundedSource}.
*/
public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
- extends BoundedSource<T> {
+ extends Source<T> {
+
private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
- private static volatile Cache<MicrobatchSource<?, ?>, BoundedReader<?>> readerCache;
+ private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>> readerCache;
private final UnboundedSource<T, CheckpointMarkT> source;
private final Duration maxReadTime;
@@ -70,13 +65,13 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
private final int splitId;
MicrobatchSource(
- UnboundedSource<T, CheckpointMarkT> source,
- Duration maxReadTime,
- int numInitialSplits,
- long maxNumRecords,
- int splitId,
- int sourceId,
- double readerCacheInterval) {
+ final UnboundedSource<T, CheckpointMarkT> source,
+ final Duration maxReadTime,
+ final int numInitialSplits,
+ final long maxNumRecords,
+ final int splitId,
+ final int sourceId,
+ final double readerCacheInterval) {
this.source = source;
this.maxReadTime = maxReadTime;
this.numInitialSplits = numInitialSplits;
@@ -86,12 +81,23 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
this.readerCacheInterval = readerCacheInterval;
}
+ private static synchronized void initReaderCache(final long readerCacheInterval) {
+ if (readerCache == null) {
+ LOG.info("Creating reader cache. Cache interval = {} ms.", readerCacheInterval);
+ readerCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+ .removalListener(new ReaderCacheRemovalListener())
+ .build();
+ }
+ }
+
/**
* Divide the given number of records into {@code numSplits} approximately
* equal parts that sum to {@code numRecords}.
*/
- private static long[] splitNumRecords(long numRecords, int numSplits) {
- long[] splitNumRecords = new long[numSplits];
+ private static long[] splitNumRecords(final long numRecords, final int numSplits) {
+ final long[] splitNumRecords = new long[numSplits];
for (int i = 0; i < numSplits; i++) {
splitNumRecords[i] = numRecords / numSplits;
}
@@ -101,56 +107,33 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
return splitNumRecords;
}
- @Override
- public List<? extends BoundedSource<T>> split(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
- List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
+ List<? extends Source<T>> split(final PipelineOptions options) throws Exception {
+ final List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
+ final List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
source.split(numInitialSplits, options);
- int numSplits = splits.size();
- long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
+ final int numSplits = splits.size();
+ final long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
for (int i = 0; i < numSplits; i++) {
// splits must be stable, and cannot change during consecutive executions
// for example: Kafka should not add partitions if more then one topic is read.
- result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId,
- readerCacheInterval));
+ result.add(
+ new MicrobatchSource<>(
+ splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, readerCacheInterval));
}
return result;
}
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 0;
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return getOrCreateReader(options, null);
- }
-
@SuppressWarnings("unchecked")
- public BoundedReader<T> getOrCreateReader(
- PipelineOptions options,
- CheckpointMarkT checkpointMark) throws IOException {
+ public Source.Reader<T> getOrCreateReader(
+ final PipelineOptions options, final CheckpointMarkT checkpointMark) throws IOException {
try {
initReaderCache((long) readerCacheInterval);
- return (BoundedReader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
- } catch (ExecutionException e) {
+ return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
+ } catch (final ExecutionException e) {
throw new RuntimeException("Failed to get or create reader", e);
}
}
- private static synchronized void initReaderCache(long readerCacheInterval) {
- if (readerCache == null) {
- LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms.");
- readerCache =
- CacheBuilder.newBuilder()
- .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
- .removalListener(new ReaderCacheRemovalListener())
- .build();
- }
- }
-
@Override
public void validate() {
source.validate();
@@ -193,21 +176,26 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
/**
- * A {@link BoundedSource.BoundedReader}
- * wrapping an {@link UnboundedSource.UnboundedReader}.
+ * Mostly based on
+ * {@link org.apache.beam.sdk.io.BoundedReadFromUnboundedSource}'s
+ * <code>UnboundedToBoundedSourceAdapter</code>,
+ * with some adjustments for Spark specifics.
*
- * <p>This Reader will read until it reached the bound of duration, or max records,
- * whichever comes first.
+ * <p>This Reader reads until one of the following thresholds has been reached:
+ * <ol>
+ * <li>max records (per batch)</li>
+ * <li>max read duration (per batch)</li>
+ * </ol>
*/
- public class Reader extends BoundedSource.BoundedReader<T> {
+ public class Reader extends Source.Reader<T> {
private long recordsRead = 0L;
- private Instant endTime;
+ private Instant readEndTime;
private final FluentBackoff backoffFactory;
- private final UnboundedSource.UnboundedReader<T> reader;
+ private final UnboundedSource.UnboundedReader<T> unboundedReader;
private boolean started;
- private Reader(UnboundedSource.UnboundedReader<T> reader) {
- this.reader = reader;
+ private Reader(final UnboundedSource.UnboundedReader<T> unboundedReader) {
+ this.unboundedReader = unboundedReader;
backoffFactory =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.millis(10))
@@ -215,21 +203,28 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
.withMaxCumulativeBackoff(maxReadTime.minus(1));
}
+ private boolean startIfNeeded() throws IOException {
+ return !started && ((started = true) && unboundedReader.start());
+ }
+
+ private void prepareForNewBatchReading() {
+ readEndTime = Instant.now().plus(maxReadTime);
+ recordsRead = 0L;
+ }
+
@Override
public boolean start() throws IOException {
- LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a "
- + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime,
- maxNumRecords);
- endTime = Instant.now().plus(maxReadTime);
- // Since reader is reused in microbatches only start it if it has not already been started.
- if (!started) {
- started = true;
- if (reader.start()) {
- recordsRead++;
- return true;
- }
- }
- return advanceWithBackoff();
+ LOG.debug(
+ "MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a max "
+ + "read time of {} millis, and max number of records {}.",
+ splitId,
+ maxReadTime,
+ maxNumRecords);
+
+ prepareForNewBatchReading();
+
+ // either start a new read, or continue an existing one
+ return startIfNeeded() || advanceWithBackoff();
}
@Override
@@ -237,20 +232,21 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
if (recordsRead >= maxNumRecords) {
finalizeCheckpoint();
return false;
+ } else {
+ return advanceWithBackoff();
}
- return advanceWithBackoff();
}
private boolean advanceWithBackoff() throws IOException {
// Try reading from the source with exponential backoff
- BackOff backoff = backoffFactory.backoff();
+ final BackOff backoff = backoffFactory.backoff();
long nextSleep = backoff.nextBackOffMillis();
while (nextSleep != BackOff.STOP) {
- if (endTime != null && Instant.now().isAfter(endTime)) {
+ if (readEndTime != null && Instant.now().isAfter(readEndTime)) {
finalizeCheckpoint();
return false;
}
- if (reader.advance()) {
+ if (unboundedReader.advance()) {
recordsRead++;
return true;
}
@@ -262,55 +258,55 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
private void finalizeCheckpoint() throws IOException {
- reader.getCheckpointMark().finalizeCheckpoint();
+ unboundedReader.getCheckpointMark().finalizeCheckpoint();
LOG.debug("MicrobatchReader-{}: finalized CheckpointMark successfully after "
+ "reading {} records.", splitId, recordsRead);
}
@Override
public T getCurrent() throws NoSuchElementException {
- return reader.getCurrent();
+ return unboundedReader.getCurrent();
}
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
- return reader.getCurrentTimestamp();
+ return unboundedReader.getCurrentTimestamp();
}
@Override
public void close() throws IOException {
- reader.close();
+ unboundedReader.close();
}
@Override
- public BoundedSource<T> getCurrentSource() {
+ public Source<T> getCurrentSource() {
return MicrobatchSource.this;
}
@SuppressWarnings("unchecked")
public CheckpointMarkT getCheckpointMark() {
- return (CheckpointMarkT) reader.getCheckpointMark();
+ return (CheckpointMarkT) unboundedReader.getCheckpointMark();
}
public Instant getWatermark() {
- return reader.getWatermark();
+ return unboundedReader.getWatermark();
}
}
/**
* {@link Callable} which creates a {@link Reader}.
*/
- private class ReaderLoader implements Callable<BoundedReader<T>> {
+ private class ReaderLoader implements Callable<Source.Reader<T>> {
private final PipelineOptions options;
private final CheckpointMarkT checkpointMark;
- ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) {
+ ReaderLoader(final PipelineOptions options, final CheckpointMarkT checkpointMark) {
this.options = options;
this.checkpointMark = checkpointMark;
}
@Override
- public BoundedReader<T> call() throws Exception {
+ public Reader call() throws Exception {
LOG.info("No cached reader found for split: [" + source
+ "]. Creating new reader at checkpoint mark " + checkpointMark);
return new Reader(source.createReader(options, checkpointMark));
@@ -321,12 +317,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
* Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}.
*/
private static class ReaderCacheRemovalListener
- implements RemovalListener<MicrobatchSource<?, ?>, BoundedReader<?>> {
- @Override public void onRemoval(
- RemovalNotification<MicrobatchSource<?, ?>, BoundedReader<?>> notification) {
+ implements RemovalListener<MicrobatchSource<?, ?>, Source.Reader<?>> {
+
+ @Override
+ public void onRemoval(
+ final RemovalNotification<MicrobatchSource<?, ?>, Source.Reader<?>> notification) {
try {
notification.getValue().close();
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index d33529c..d8f414a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
try {
this.numPartitions =
createMicrobatchSource()
- .split(initialParallelism, options)
+ .split(options)
.size();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -124,8 +124,14 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
- return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism,
- boundMaxRecords, -1, id(), readerCacheInterval);
+ return new MicrobatchSource<>(
+ unboundedSource,
+ boundReadDuration,
+ initialParallelism,
+ boundMaxRecords,
+ -1,
+ id(),
+ readerCacheInterval);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index b99ae10..e294359 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -258,12 +258,13 @@ public class SourceRDD {
@Override
public Partition[] getPartitions() {
try {
- List<? extends Source<T>> partitionedSources = microbatchSource.split(
- -1 /* ignored */, runtimeContext.getPipelineOptions());
- Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
+ final List<? extends Source<T>> partitionedSources =
+ microbatchSource.split(runtimeContext.getPipelineOptions());
+ final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
- partitions[i] = new CheckpointableSourcePartition<>(id(), i, partitionedSources.get(i),
- EmptyCheckpointMark.get());
+ partitions[i] =
+ new CheckpointableSourcePartition<>(
+ id(), i, partitionedSources.get(i), EmptyCheckpointMark.get());
}
return partitions;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index c9de7fa..30ee639 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -124,7 +123,7 @@ public class StateSpecFunctions {
// Initial high/low watermarks.
Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant highWatermark;
+ final Instant highWatermark;
// if state exists, use it, otherwise it's first time so use the startCheckpointMark.
// startCheckpointMark may be EmptyCheckpointMark (the Spark Java API tries to apply
@@ -146,13 +145,15 @@ public class StateSpecFunctions {
}
// create reader.
- BoundedSource.BoundedReader<T> reader;
- Stopwatch stopwatch = Stopwatch.createStarted();
+ final MicrobatchSource.Reader/*<T>*/ microbatchReader;
+ final Stopwatch stopwatch = Stopwatch.createStarted();
long readDurationMillis = 0;
try {
- reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
- checkpointMark);
+ microbatchReader =
+ (MicrobatchSource.Reader)
+ microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+ checkpointMark);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -165,16 +166,19 @@ public class StateSpecFunctions {
GlobalWindow.Coder.INSTANCE);
try {
// measure how long a read takes per-partition.
- boolean finished = !reader.start();
+ boolean finished = !microbatchReader.start();
while (!finished) {
- WindowedValue<T> wv = WindowedValue.of(reader.getCurrent(),
- reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ final WindowedValue<T> wv =
+ WindowedValue.of((T) microbatchReader.getCurrent(),
+ microbatchReader.getCurrentTimestamp(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING);
readValues.add(CoderHelpers.toByteArray(wv, coder));
- finished = !reader.advance();
+ finished = !microbatchReader.advance();
}
// end-of-read watermark is the high watermark, but don't allow decrease.
- Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark();
+ final Instant sourceWatermark = microbatchReader.getWatermark();
highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark;
readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
@@ -186,8 +190,8 @@ public class StateSpecFunctions {
// if the Source does not supply a CheckpointMark skip updating the state.
@SuppressWarnings("unchecked")
- CheckpointMarkT finishedReadCheckpointMark =
- (CheckpointMarkT) ((MicrobatchSource.Reader) reader).getCheckpointMark();
+ final CheckpointMarkT finishedReadCheckpointMark =
+ (CheckpointMarkT) microbatchReader.getCheckpointMark();
byte[] codedCheckpoint = new byte[0];
if (finishedReadCheckpointMark != null) {
codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);
[2/2] beam git commit: This closes #2698
Posted by st...@apache.org.
This closes #2698
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/254470e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/254470e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/254470e6
Branch: refs/heads/master
Commit: 254470e626edb6a013ba5cb2d3312dc3bfbdcb51
Parents: b414f8d 3b6f4f6
Author: Stas Levin <st...@apache.org>
Authored: Mon May 1 14:07:20 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Mon May 1 14:07:20 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 188 +++++++++----------
.../beam/runners/spark/io/SourceDStream.java | 12 +-
.../apache/beam/runners/spark/io/SourceRDD.java | 11 +-
.../spark/stateful/StateSpecFunctions.java | 30 +--
4 files changed, 125 insertions(+), 116 deletions(-)
----------------------------------------------------------------------