You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/04/09 19:47:35 UTC
[1/2] beam git commit: [BEAM-1294] Long running UnboundedSource
Readers
Repository: beam
Updated Branches:
refs/heads/master a0cfccda4 -> efd785f88
[BEAM-1294] Long running UnboundedSource Readers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d958796b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d958796b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d958796b
Branch: refs/heads/master
Commit: d958796b525861764318f0c022e4987aa64ac300
Parents: a0cfccd
Author: Aviem Zur <av...@gmail.com>
Authored: Fri Feb 17 12:35:49 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Apr 9 22:42:57 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 113 ++++++++++++++++---
.../beam/runners/spark/io/SourceDStream.java | 11 +-
.../spark/stateful/StateSpecFunctions.java | 6 +-
.../ResumeFromCheckpointStreamingTest.java | 14 ++-
4 files changed, 118 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index ff818a1..002eb34 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -19,11 +19,18 @@
package org.apache.beam.runners.spark.io;
import com.google.api.client.util.BackOff;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -49,29 +56,34 @@ import org.slf4j.LoggerFactory;
public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends BoundedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
+ private static volatile Cache<MicrobatchSource<?, ?>, BoundedReader<?>> readerCache;
private final UnboundedSource<T, CheckpointMarkT> source;
private final Duration maxReadTime;
private final int numInitialSplits;
private final long maxNumRecords;
private final int sourceId;
+ private final double readerCacheInterval;
// each split of the underlying UnboundedSource is associated with a (consistent) id
// to match it's corresponding CheckpointMark state.
private final int splitId;
- MicrobatchSource(UnboundedSource<T, CheckpointMarkT> source,
- Duration maxReadTime,
- int numInitialSplits,
- long maxNumRecords,
- int splitId,
- int sourceId) {
+ MicrobatchSource(
+ UnboundedSource<T, CheckpointMarkT> source,
+ Duration maxReadTime,
+ int numInitialSplits,
+ long maxNumRecords,
+ int splitId,
+ int sourceId,
+ double readerCacheInterval) {
this.source = source;
this.maxReadTime = maxReadTime;
this.numInitialSplits = numInitialSplits;
this.maxNumRecords = maxNumRecords;
this.splitId = splitId;
this.sourceId = sourceId;
+ this.readerCacheInterval = readerCacheInterval;
}
/**
@@ -101,7 +113,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
for (int i = 0; i < numSplits; i++) {
// splits must be stable, and cannot change during consecutive executions
// for example: Kafka should not add partitions if more then one topic is read.
- result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId));
+ result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId,
+ readerCacheInterval));
}
return result;
}
@@ -113,12 +126,30 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
@Override
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- return createReader(options, null);
+ return getOrCreateReader(options, null);
}
- public BoundedReader<T> createReader(PipelineOptions options, CheckpointMarkT checkpointMark)
- throws IOException {
- return new Reader(source.createReader(options, checkpointMark));
+ @SuppressWarnings("unchecked")
+ public BoundedReader<T> getOrCreateReader(
+ PipelineOptions options,
+ CheckpointMarkT checkpointMark) throws IOException {
+ try {
+ initReaderCache((long) readerCacheInterval);
+ return (BoundedReader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Failed to get or create reader", e);
+ }
+ }
+
+ private synchronized void initReaderCache(long readerCacheInterval) {
+ if (readerCache == null) {
+ LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms.");
+ readerCache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+ .removalListener(new ReaderCacheRemovalListener())
+ .build();
+ }
}
@Override
@@ -171,12 +202,12 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
*/
public class Reader extends BoundedSource.BoundedReader<T> {
private long recordsRead = 0L;
- private final Instant endTime;
+ private Instant endTime;
private final FluentBackoff backoffFactory;
private final UnboundedSource.UnboundedReader<T> reader;
+ private boolean started;
private Reader(UnboundedSource.UnboundedReader<T> reader) {
- endTime = Instant.now().plus(maxReadTime);
this.reader = reader;
backoffFactory =
FluentBackoff.DEFAULT
@@ -190,12 +221,16 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a "
+ "max read time of {} msec, and max number of records {}.", splitId, maxReadTime,
maxNumRecords);
- if (reader.start()) {
- recordsRead++;
- return true;
- } else {
- return advanceWithBackoff();
+ endTime = Instant.now().plus(maxReadTime);
+ // Since reader is reused in microbatches only start it if it has not already been started.
+ if (!started) {
+ started = true;
+ if (reader.start()) {
+ recordsRead++;
+ return true;
+ }
}
+ return advanceWithBackoff();
}
@Override
@@ -262,4 +297,46 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
return reader.getWatermark();
}
}
+
+ /**
+ * {@link Callable} which creates a {@link Reader}.
+ */
+ private class ReaderLoader implements Callable<BoundedReader<T>> {
+ private final PipelineOptions options;
+ private final CheckpointMarkT checkpointMark;
+
+ ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) {
+ this.options = options;
+ this.checkpointMark = checkpointMark;
+ }
+
+ @Override
+ public BoundedReader<T> call() throws Exception {
+ LOG.info("No cached reader found for split: [" + source
+ + "]. Creating new reader at checkpoint mark " + checkpointMark);
+ return new Reader(source.createReader(options, checkpointMark));
+ }
+ }
+
+ /**
+ * Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}.
+ */
+ private static class ReaderCacheRemovalListener
+ implements RemovalListener<MicrobatchSource<?, ?>, BoundedReader<?>> {
+ @Override public void onRemoval(
+ RemovalNotification<MicrobatchSource<?, ?>, BoundedReader<?>> notification) {
+ try {
+ notification.getValue().close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static void clearCache() {
+ synchronized (MicrobatchSource.class) {
+ readerCache.invalidateAll();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index b7bfeed..fb6da97 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -60,6 +60,12 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
private final SparkRuntimeContext runtimeContext;
private final Duration boundReadDuration;
+ // Reader cache interval to expire readers if they haven't been accessed in the last microbatch.
+ // The reason we expire readers is that upon executor death/addition source split ownership can be
+ // reshuffled between executors. When this happens we want to close and expire unused readers
+ // in the executor in case it regains ownership of the source split in the future - to avoid
+ // resuming from an earlier checkpoint.
+ private final double readerCacheInterval;
// Number of partitions for the DStream is final and remains the same throughout the entire
// lifetime of the pipeline, including when resuming from checkpoint.
private final int numPartitions;
@@ -84,6 +90,9 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
SparkPipelineOptions.class);
+ // Reader cache expiration interval. 50% of batch interval is added to accommodate latency.
+ this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis();
+
this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
options.getMinReadTimeMillis());
// set initial parallelism once.
@@ -116,7 +125,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism,
- boundMaxRecords, -1, id());
+ boundMaxRecords, -1, id(), readerCacheInterval);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 6d1b7c0..c9de7fa 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -151,8 +151,8 @@ public class StateSpecFunctions {
long readDurationMillis = 0;
try {
- reader =
- microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark);
+ reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+ checkpointMark);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -177,8 +177,6 @@ public class StateSpecFunctions {
Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark();
highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark;
- // close and checkpoint reader.
- reader.close();
readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
LOG.info(
http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 5c1963d..6cbf83a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.UsesCheckpointRecovery;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
@@ -79,6 +80,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -173,9 +175,7 @@ public class ResumeFromCheckpointStreamingTest {
//--- between executions:
//- clear state.
- AggregatorsAccumulator.clear();
- MetricsAccumulator.clear();
- GlobalWatermarkHolder.clear();
+ clean();
//- write a bit more.
produce(ImmutableMap.of(
@@ -272,6 +272,14 @@ public class ResumeFromCheckpointStreamingTest {
return (SparkPipelineResult) p.run();
}
+ @After
+ public void clean() {
+ AggregatorsAccumulator.clear();
+ MetricsAccumulator.clear();
+ GlobalWatermarkHolder.clear();
+ MicrobatchSource.clearCache();
+ }
+
@AfterClass
public static void tearDown() {
EMBEDDED_KAFKA_CLUSTER.shutdown();
[2/2] beam git commit: This closes #2033
Posted by av...@apache.org.
This closes #2033
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efd785f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efd785f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efd785f8
Branch: refs/heads/master
Commit: efd785f881d2c231f202c5031d6aeeb042177850
Parents: a0cfccd d958796
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Apr 9 22:47:03 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Apr 9 22:47:03 2017 +0300
----------------------------------------------------------------------
.../beam/runners/spark/io/MicrobatchSource.java | 113 ++++++++++++++++---
.../beam/runners/spark/io/SourceDStream.java | 11 +-
.../spark/stateful/StateSpecFunctions.java | 6 +-
.../ResumeFromCheckpointStreamingTest.java | 14 ++-
4 files changed, 118 insertions(+), 26 deletions(-)
----------------------------------------------------------------------