You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:32 UTC
[43/50] beam git commit: [BEAM-2408] Fix watermark emission in Flink
UnboundedSourceWrapper
[BEAM-2408] Fix watermark emission in Flink UnboundedSourceWrapper
Before, there was no call to setNextWatermarkTimer() in case the source
had multiple Readers.
This also adds a test for watermark emission to
UnboundedSourceWrapperTest.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8f4fa439
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8f4fa439
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8f4fa439
Branch: refs/heads/DSL_SQL
Commit: 8f4fa4394609504348ada988948f0a1386d54c0e
Parents: c1dc8f5
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 15:46:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200
----------------------------------------------------------------------
.../streaming/io/UnboundedSourceWrapper.java | 2 +
.../streaming/UnboundedSourceWrapperTest.java | 111 ++++++++++++++++++-
2 files changed, 112 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index ec21699..6055a43 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -283,6 +283,8 @@ public class UnboundedSourceWrapper<
}
}
+ setNextWatermarkTimer(this.runtimeContext);
+
// a flag telling us whether any of the localReaders had data
// if no reader had data, sleep for bit
boolean hadData = false;
http://git-wip-us.apache.org/repos/asf/beam/blob/8f4fa439/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 716e71d..bb2be60 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -21,11 +21,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +48,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
+import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
@@ -88,7 +92,7 @@ public class UnboundedSourceWrapperTest {
* If numSplits > numTasks the source has one source will manage multiple readers.
*/
@Test
- public void testReaders() throws Exception {
+ public void testValueEmission() throws Exception {
final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
@@ -164,6 +168,111 @@ public class UnboundedSourceWrapperTest {
}
/**
+ * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+ * If numSplits > numTasks the source has one source will manage multiple readers.
+ *
+ * <p>This test verifies that watermark are correctly forwarded.
+ */
+ @Test(timeout = 30_000)
+ public void testWatermarkEmission() throws Exception {
+ final int numElements = 500;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(numElements);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
+
+ assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+ final StreamSource<WindowedValue<
+ ValueWithRecordId<KV<Integer, Integer>>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ final AbstractStreamOperatorTestHarness<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> testHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ sourceOperator,
+ numTasks /* max parallelism */,
+ numTasks /* parallelism */,
+ 0 /* subtask index */);
+
+ testHarness.setProcessingTime(Instant.now().getMillis());
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>();
+
+ // use the AtomicBoolean just for the set()/get() functionality for communicating
+ // with the outer Thread
+ final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+
+ Thread sourceThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testHarness.open();
+ sourceOperator.run(checkpointLock,
+ new TestStreamStatusMaintainer(),
+ new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ if (watermark.getTimestamp() >= numElements / 2) {
+ seenWatermark.set(true);
+ }
+ }
+
+ @Override
+ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {
+ }
+
+ @Override
+ public void collect(StreamRecord<WindowedValue<
+ ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (Exception e) {
+ System.out.println("Caught exception: " + e);
+ caughtExceptions.add(e);
+ }
+ }
+ };
+
+ sourceThread.start();
+
+ while (true) {
+ if (!caughtExceptions.isEmpty()) {
+ fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
+ }
+ if (seenWatermark.get()) {
+ break;
+ }
+ Thread.sleep(10);
+
+ // need to advance this so that the watermark timers in the source wrapper fire
+ testHarness.setProcessingTime(Instant.now().getMillis());
+ }
+
+ sourceOperator.cancel();
+ sourceThread.join();
+ }
+
+
+ /**
* Verify that snapshot/restore work as expected. We bring up a source and cancel
* after seeing a certain number of elements. Then we snapshot that source,
* bring up a completely new source that we restore from the snapshot and verify