You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/04/09 19:47:35 UTC

[1/2] beam git commit: [BEAM-1294] Long running UnboundedSource Readers

Repository: beam
Updated Branches:
  refs/heads/master a0cfccda4 -> efd785f88


[BEAM-1294] Long running UnboundedSource Readers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d958796b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d958796b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d958796b

Branch: refs/heads/master
Commit: d958796b525861764318f0c022e4987aa64ac300
Parents: a0cfccd
Author: Aviem Zur <av...@gmail.com>
Authored: Fri Feb 17 12:35:49 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Apr 9 22:42:57 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 113 ++++++++++++++++---
 .../beam/runners/spark/io/SourceDStream.java    |  11 +-
 .../spark/stateful/StateSpecFunctions.java      |   6 +-
 .../ResumeFromCheckpointStreamingTest.java      |  14 ++-
 4 files changed, 118 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index ff818a1..002eb34 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -19,11 +19,18 @@
 package org.apache.beam.runners.spark.io;
 
 import com.google.api.client.util.BackOff;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -49,29 +56,34 @@ import org.slf4j.LoggerFactory;
 public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends BoundedSource<T> {
   private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
+  private static volatile Cache<MicrobatchSource<?, ?>, BoundedReader<?>> readerCache;
 
   private final UnboundedSource<T, CheckpointMarkT> source;
   private final Duration maxReadTime;
   private final int numInitialSplits;
   private final long maxNumRecords;
   private final int sourceId;
+  private final double readerCacheInterval;
 
   // each split of the underlying UnboundedSource is associated with a (consistent) id
   // to match it's corresponding CheckpointMark state.
   private final int splitId;
 
-  MicrobatchSource(UnboundedSource<T, CheckpointMarkT> source,
-                   Duration maxReadTime,
-                   int numInitialSplits,
-                   long maxNumRecords,
-                   int splitId,
-                   int sourceId) {
+  MicrobatchSource(
+      UnboundedSource<T, CheckpointMarkT> source,
+      Duration maxReadTime,
+      int numInitialSplits,
+      long maxNumRecords,
+      int splitId,
+      int sourceId,
+      double readerCacheInterval) {
     this.source = source;
     this.maxReadTime = maxReadTime;
     this.numInitialSplits = numInitialSplits;
     this.maxNumRecords = maxNumRecords;
     this.splitId = splitId;
     this.sourceId = sourceId;
+    this.readerCacheInterval = readerCacheInterval;
   }
 
   /**
@@ -101,7 +113,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     for (int i = 0; i < numSplits; i++) {
       // splits must be stable, and cannot change during consecutive executions
       // for example: Kafka should not add partitions if more then one topic is read.
-      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId));
+      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId,
+          readerCacheInterval));
     }
     return result;
   }
@@ -113,12 +126,30 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
 
   @Override
   public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-    return createReader(options, null);
+    return getOrCreateReader(options, null);
   }
 
-  public BoundedReader<T> createReader(PipelineOptions options, CheckpointMarkT checkpointMark)
-      throws IOException {
-    return new Reader(source.createReader(options, checkpointMark));
+  @SuppressWarnings("unchecked")
+  public BoundedReader<T> getOrCreateReader(
+      PipelineOptions options,
+      CheckpointMarkT checkpointMark) throws IOException {
+    try {
+      initReaderCache((long) readerCacheInterval);
+      return (BoundedReader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Failed to get or create reader", e);
+    }
+  }
+
+  private synchronized void initReaderCache(long readerCacheInterval) {
+    if (readerCache == null) {
+      LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms.");
+      readerCache =
+          CacheBuilder.newBuilder()
+              .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+              .removalListener(new ReaderCacheRemovalListener())
+              .build();
+    }
   }
 
   @Override
@@ -171,12 +202,12 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
    */
   public class Reader extends BoundedSource.BoundedReader<T> {
     private long recordsRead = 0L;
-    private final Instant endTime;
+    private Instant endTime;
     private final FluentBackoff backoffFactory;
     private final UnboundedSource.UnboundedReader<T> reader;
+    private boolean started;
 
     private Reader(UnboundedSource.UnboundedReader<T> reader) {
-      endTime = Instant.now().plus(maxReadTime);
       this.reader = reader;
       backoffFactory =
           FluentBackoff.DEFAULT
@@ -190,12 +221,16 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a "
           + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime,
               maxNumRecords);
-      if (reader.start()) {
-        recordsRead++;
-        return true;
-      } else {
-        return advanceWithBackoff();
+      endTime = Instant.now().plus(maxReadTime);
+      // Since reader is reused in microbatches only start it if it has not already been started.
+      if (!started) {
+        started = true;
+        if (reader.start()) {
+          recordsRead++;
+          return true;
+        }
       }
+      return advanceWithBackoff();
     }
 
     @Override
@@ -262,4 +297,46 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       return reader.getWatermark();
     }
   }
+
+  /**
+   * {@link Callable} which creates a {@link Reader}.
+   */
+  private class ReaderLoader implements Callable<BoundedReader<T>> {
+    private final PipelineOptions options;
+    private final CheckpointMarkT checkpointMark;
+
+    ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) {
+      this.options = options;
+      this.checkpointMark = checkpointMark;
+    }
+
+    @Override
+    public BoundedReader<T> call() throws Exception {
+      LOG.info("No cached reader found for split: [" + source
+          + "]. Creating new reader at checkpoint mark " + checkpointMark);
+      return new Reader(source.createReader(options, checkpointMark));
+    }
+  }
+
+  /**
+   * Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}.
+   */
+  private static class ReaderCacheRemovalListener
+      implements RemovalListener<MicrobatchSource<?, ?>, BoundedReader<?>> {
+    @Override public void onRemoval(
+        RemovalNotification<MicrobatchSource<?, ?>, BoundedReader<?>> notification) {
+      try {
+        notification.getValue().close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static void clearCache() {
+    synchronized (MicrobatchSource.class) {
+      readerCache.invalidateAll();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index b7bfeed..fb6da97 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -60,6 +60,12 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
   private final SparkRuntimeContext runtimeContext;
   private final Duration boundReadDuration;
+  // Reader cache interval to expire readers if they haven't been accessed in the last microbatch.
+  // The reason we expire readers is that upon executor death/addition source split ownership can be
+  // reshuffled between executors. When this happens we want to close and expire unused readers
+  // in the executor in case it regains ownership of the source split in the future - to avoid
+  // resuming from an earlier checkpoint.
+  private final double readerCacheInterval;
   // Number of partitions for the DStream is final and remains the same throughout the entire
   // lifetime of the pipeline, including when resuming from checkpoint.
   private final int numPartitions;
@@ -84,6 +90,9 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
         SparkPipelineOptions.class);
 
+    // Reader cache expiration interval. 50% of batch interval is added to accommodate latency.
+    this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis();
+
     this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
         options.getMinReadTimeMillis());
     // set initial parallelism once.
@@ -116,7 +125,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 
   private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
     return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism,
-        boundMaxRecords, -1, id());
+        boundMaxRecords, -1, id(), readerCacheInterval);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 6d1b7c0..c9de7fa 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -151,8 +151,8 @@ public class StateSpecFunctions {
         long readDurationMillis = 0;
 
         try {
-          reader =
-              microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark);
+          reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+              checkpointMark);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -177,8 +177,6 @@ public class StateSpecFunctions {
           Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark();
           highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark;
 
-          // close and checkpoint reader.
-          reader.close();
           readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
 
           LOG.info(

http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 5c1963d..6cbf83a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -41,6 +41,7 @@ import org.apache.beam.runners.spark.TestSparkPipelineOptions;
 import org.apache.beam.runners.spark.UsesCheckpointRecovery;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
@@ -79,6 +80,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -173,9 +175,7 @@ public class ResumeFromCheckpointStreamingTest {
     //--- between executions:
 
     //- clear state.
-    AggregatorsAccumulator.clear();
-    MetricsAccumulator.clear();
-    GlobalWatermarkHolder.clear();
+    clean();
 
     //- write a bit more.
     produce(ImmutableMap.of(
@@ -272,6 +272,14 @@ public class ResumeFromCheckpointStreamingTest {
     return (SparkPipelineResult) p.run();
   }
 
+  @After
+  public void clean() {
+    AggregatorsAccumulator.clear();
+    MetricsAccumulator.clear();
+    GlobalWatermarkHolder.clear();
+    MicrobatchSource.clearCache();
+  }
+
   @AfterClass
   public static void tearDown() {
     EMBEDDED_KAFKA_CLUSTER.shutdown();


[2/2] beam git commit: This closes #2033

Posted by av...@apache.org.
This closes #2033


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efd785f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efd785f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efd785f8

Branch: refs/heads/master
Commit: efd785f881d2c231f202c5031d6aeeb042177850
Parents: a0cfccd d958796
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Apr 9 22:47:03 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Sun Apr 9 22:47:03 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 113 ++++++++++++++++---
 .../beam/runners/spark/io/SourceDStream.java    |  11 +-
 .../spark/stateful/StateSpecFunctions.java      |   6 +-
 .../ResumeFromCheckpointStreamingTest.java      |  14 ++-
 4 files changed, 118 insertions(+), 26 deletions(-)
----------------------------------------------------------------------