You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:32 UTC

[43/50] beam git commit: [BEAM-2408] Fix watermark emission in Flink UnboundedSourceWrapper

[BEAM-2408] Fix watermark emission in Flink UnboundedSourceWrapper

Before, there was no call to setNextWatermarkTimer() in case the source
had multiple Readers.

This also adds a test for watermark emission to
UnboundedSourceWrapperTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f4fa439
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f4fa439
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f4fa439

Branch: refs/heads/DSL_SQL
Commit: 8f4fa4394609504348ada988948f0a1386d54c0e
Parents: c1dc8f5
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 15:46:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    |   2 +
 .../streaming/UnboundedSourceWrapperTest.java   | 111 ++++++++++++++++++-
 2 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index ec21699..6055a43 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -283,6 +283,8 @@ public class UnboundedSourceWrapper<
         }
       }
 
+      setNextWatermarkTimer(this.runtimeContext);
+
       // a flag telling us whether any of the localReaders had data
       // if no reader had data, sleep for bit
       boolean hadData = false;

http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 716e71d..bb2be60 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -21,11 +21,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +48,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
@@ -88,7 +92,7 @@ public class UnboundedSourceWrapperTest {
      * If numSplits > numTasks the source has one source will manage multiple readers.
      */
     @Test
-    public void testReaders() throws Exception {
+    public void testValueEmission() throws Exception {
       final int numElements = 20;
       final Object checkpointLock = new Object();
       PipelineOptions options = PipelineOptionsFactory.create();
@@ -164,6 +168,111 @@ public class UnboundedSourceWrapperTest {
     }
 
     /**
+     * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+     * If numSplits > numTasks the source has one source will manage multiple readers.
+     *
+     * <p>This test verifies that watermark are correctly forwarded.
+     */
+    @Test(timeout = 30_000)
+    public void testWatermarkEmission() throws Exception {
+      final int numElements = 500;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+      // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      final StreamSource<WindowedValue<
+          ValueWithRecordId<KV<Integer, Integer>>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+      final AbstractStreamOperatorTestHarness<
+          WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> testHarness =
+          new AbstractStreamOperatorTestHarness<>(
+              sourceOperator,
+              numTasks /* max parallelism */,
+              numTasks /* parallelism */,
+              0 /* subtask index */);
+
+      testHarness.setProcessingTime(Instant.now().getMillis());
+      testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+      final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>();
+
+      // use the AtomicBoolean just for the set()/get() functionality for communicating
+      // with the outer Thread
+      final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+
+      Thread sourceThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            testHarness.open();
+            sourceOperator.run(checkpointLock,
+                new TestStreamStatusMaintainer(),
+                new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
+
+                  @Override
+                  public void emitWatermark(Watermark watermark) {
+                    if (watermark.getTimestamp() >= numElements / 2) {
+                      seenWatermark.set(true);
+                    }
+                  }
+
+                  @Override
+                  public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
+                  }
+
+                  @Override
+                  public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                  }
+
+                  @Override
+                  public void collect(StreamRecord<WindowedValue<
+                      ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
+                  }
+
+                  @Override
+                  public void close() {
+
+                  }
+                });
+          } catch (Exception e) {
+            System.out.println("Caught exception: " + e);
+            caughtExceptions.add(e);
+          }
+        }
+      };
+
+      sourceThread.start();
+
+      while (true) {
+        if (!caughtExceptions.isEmpty()) {
+          fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
+        }
+        if (seenWatermark.get()) {
+          break;
+        }
+        Thread.sleep(10);
+
+        // need to advance this so that the watermark timers in the source wrapper fire
+        testHarness.setProcessingTime(Instant.now().getMillis());
+      }
+
+      sourceOperator.cancel();
+      sourceThread.join();
+    }
+
+
+    /**
      * Verify that snapshot/restore work as expected. We bring up a source and cancel
      * after seeing a certain number of elements. Then we snapshot that source,
      * bring up a completely new source that we restore from the snapshot and verify