You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/27 04:50:27 UTC

[beam] 01/03: [BEAM-3087] Make reader state update and element emission atomic

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fef8f54be417be88730ebf2d0e8db8c55fbd013f
Author: Grzegorz KoĊ‚akowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 10:31:53 2018 +0100

    [BEAM-3087] Make reader state update and element emission atomic
    
    Reader advancement should be considered as reader state update too.
    Therefore, the reader's advancement and element emission are in the
    same synchronized section.
---
 .../streaming/io/UnboundedSourceWrapper.java       | 67 ++++++++++++----------
 1 file changed, 37 insertions(+), 30 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index fc23c01..3f04b6c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -186,7 +186,7 @@ public class UnboundedSourceWrapper<
 
     if (isRestored) {
       // restore the splitSources from the checkpoint to ensure consistent ordering
-      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
+      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored :
           stateForCheckpoint.get()) {
         localSplitSources.add(restored.getKey());
         localReaders.add(restored.getKey().createReader(
@@ -229,19 +229,25 @@ public class UnboundedSourceWrapper<
       // the easy case, we just read from one reader
       UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
-      boolean dataAvailable = readerInvoker.invokeStart(reader);
-      if (dataAvailable) {
-        emitElement(ctx, reader);
+      synchronized (ctx.getCheckpointLock()) {
+        boolean dataAvailable = readerInvoker.invokeStart(reader);
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+        }
       }
 
       setNextWatermarkTimer(this.runtimeContext);
 
       while (isRunning) {
-        dataAvailable = readerInvoker.invokeAdvance(reader);
+        boolean dataAvailable;
+        synchronized (ctx.getCheckpointLock()) {
+          dataAvailable = readerInvoker.invokeAdvance(reader);
 
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+          }
+        }
+        if (!dataAvailable) {
           Thread.sleep(50);
         }
       }
@@ -254,9 +260,11 @@ public class UnboundedSourceWrapper<
 
       // start each reader and emit data if immediately available
       for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
-        boolean dataAvailable = readerInvoker.invokeStart(reader);
-        if (dataAvailable) {
-          emitElement(ctx, reader);
+        synchronized (ctx.getCheckpointLock()) {
+          boolean dataAvailable = readerInvoker.invokeStart(reader);
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+          }
         }
       }
 
@@ -267,11 +275,13 @@ public class UnboundedSourceWrapper<
       boolean hadData = false;
       while (isRunning) {
         UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
-        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
 
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
+        synchronized (ctx.getCheckpointLock()) {
+          boolean dataAvailable = readerInvoker.invokeAdvance(reader);
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+            hadData = true;
+          }
         }
 
         currentReader = (currentReader + 1) % numReaders;
@@ -321,24 +331,21 @@ public class UnboundedSourceWrapper<
       UnboundedSource.UnboundedReader<OutputT> reader) {
     // make sure that reader state update and element emission are atomic
     // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      byte[] recordId = reader.getCurrentRecordId();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
-          WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
-              GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
+    OutputT item = reader.getCurrent();
+    byte[] recordId = reader.getCurrentRecordId();
+    Instant timestamp = reader.getCurrentTimestamp();
+
+    WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
+        WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+            GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+    ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
   }
 
   @Override
   public void close() throws Exception {
     super.close();
     if (localReaders != null) {
-      for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
         reader.close();
       }
     }
@@ -394,8 +401,8 @@ public class UnboundedSourceWrapper<
       int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
       if (diff >= 0) {
         for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
-             diff >= 0;
-             diff--) {
+            diff >= 0;
+            diff--) {
           iterator.next();
           iterator.remove();
         }
@@ -434,7 +441,7 @@ public class UnboundedSourceWrapper<
       synchronized (context.getCheckpointLock()) {
         // find minimum watermark over all localReaders
         long watermarkMillis = Long.MAX_VALUE;
-        for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+        for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
           Instant watermark = reader.getWatermark();
           if (watermark != null) {
             watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.