You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/11/28 00:50:44 UTC
[beam] branch master updated: [BEAM-5197] Fix
UnboundedSourceWrapper#testWatermarkEmission (#7138)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02c763b [BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission (#7138)
02c763b is described below
commit 02c763b9f05c8020eb67e68f52ed477303c87468
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Wed Nov 28 01:50:37 2018 +0100
[BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission (#7138)
* Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
* Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
---
.../flink/streaming/TestCountingSource.java | 15 +++-
.../streaming/UnboundedSourceWrapperTest.java | 80 ++++++++++++++--------
2 files changed, 62 insertions(+), 33 deletions(-)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index e58907e..052cb04 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -55,6 +55,9 @@ public class TestCountingSource
private final boolean throwOnFirstSnapshot;
private final int fixedNumSplits;
+ /** Flag to stall processing readers' elements. */
+ private transient volatile boolean haltEmission;
+
/**
* We only allow an exception to be thrown from getCheckpointMark at most once. This must be
* static since the entire TestCountingSource instance may re-serialized when the pipeline
@@ -106,8 +109,14 @@ public class TestCountingSource
this.fixedNumSplits = fixedNumSplits;
}
- public int getShardNumber() {
- return shardNumber;
+ /** Halts emission of elements until {@code continueEmission} is invoked. */
+ void haltEmission() {
+ haltEmission = true;
+ }
+
+ /** Continues processing elements after {@code haltEmission} was invoked. */
+ void continueEmission() {
+ haltEmission = false;
}
@Override
@@ -163,7 +172,7 @@ public class TestCountingSource
@Override
public boolean advance() {
- if (current >= numMessagesPerShard - 1) {
+ if (current >= numMessagesPerShard - 1 || haltEmission) {
return false;
}
// If testing dedup, occasionally insert a duplicate value;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 28589d5..cd4fe51 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -31,7 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -57,9 +57,8 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
-import org.joda.time.Instant;
-import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
@@ -68,6 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Tests for {@link UnboundedSourceWrapper}. */
+@RunWith(Enclosed.class)
public class UnboundedSourceWrapperTest {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
@@ -216,15 +216,13 @@ public class UnboundedSourceWrapperTest {
/**
* Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. If
- * numSplits > numTasks the source has one source will manage multiple readers.
+ * numSplits > numTasks the source will manage multiple readers.
*
- * <p>This test verifies that watermark are correctly forwarded.
+ * <p>This test verifies that watermarks are correctly forwarded.
*/
@Test(timeout = 30_000)
- @Ignore("https://issues.apache.org/jira/browse/BEAM-5197") // deadlock on some platforms
public void testWatermarkEmission() throws Exception {
final int numElements = 500;
- final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
@@ -249,32 +247,36 @@ public class UnboundedSourceWrapperTest {
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);
+ testHarness.getExecutionConfig().setLatencyTrackingInterval(0);
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(1);
- testHarness.setProcessingTime(Instant.now().getMillis());
+ testHarness.setProcessingTime(Long.MIN_VALUE);
testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>();
- // use the AtomicBoolean just for the set()/get() functionality for communicating
- // with the outer Thread
- final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+ // We test emission of two watermarks here, one intermediate, one final
+ final CountDownLatch seenWatermarks = new CountDownLatch(2);
+ final int minElementsPerReader = numElements / numSplits;
+ final CountDownLatch minElementsCountdown = new CountDownLatch(minElementsPerReader);
+
+ // first halt the source to test auto watermark emission
+ source.haltEmission();
+ testHarness.open();
Thread sourceThread =
new Thread(
() -> {
try {
- testHarness.open();
sourceOperator.run(
- checkpointLock,
+ testHarness.getCheckpointLock(),
new TestStreamStatusMaintainer(),
new Output<
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@Override
public void emitWatermark(Watermark watermark) {
- if (watermark.getTimestamp() >= numElements / 2) {
- seenWatermark.set(true);
- }
+ seenWatermarks.countDown();
}
@Override
@@ -287,7 +289,9 @@ public class UnboundedSourceWrapperTest {
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
- windowedValueStreamRecord) {}
+ windowedValueStreamRecord) {
+ minElementsCountdown.countDown();
+ }
@Override
public void close() {}
@@ -300,21 +304,37 @@ public class UnboundedSourceWrapperTest {
sourceThread.start();
- while (true) {
- if (!caughtExceptions.isEmpty()) {
- fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
- }
- if (seenWatermark.get()) {
- break;
- }
+ while (flinkWrapper
+ .getLocalReaders()
+ .stream()
+ .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
+ // readers haven't been initialized
Thread.sleep(50);
+ }
- // Need to advance this so that the watermark timers in the source wrapper fire
- // Synchronize is necessary because this can interfere with updating the PriorityQueue
- // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
- synchronized (checkpointLock) {
- testHarness.setProcessingTime(Instant.now().getMillis());
- }
+ // Need to advance this so that the watermark timers in the source wrapper fire
+ // Synchronize is necessary because this can interfere with updating the PriorityQueue
+ // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.setProcessingTime(0);
+ }
+
+ // now read the elements
+ source.continueEmission();
+ // ..and await elements
+ minElementsCountdown.await();
+
+ // Need to advance this so that the watermark timers in the source wrapper fire
+ // Synchronize is necessary because this can interfere with updating the PriorityQueue
+ // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.setProcessingTime(Long.MAX_VALUE);
+ }
+
+ seenWatermarks.await();
+
+ if (!caughtExceptions.isEmpty()) {
+ fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
}
sourceOperator.cancel();