You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/11/28 00:50:44 UTC

[beam] branch master updated: [BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission (#7138)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 02c763b  [BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission (#7138)
02c763b is described below

commit 02c763b9f05c8020eb67e68f52ed477303c87468
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Wed Nov 28 01:50:37 2018 +0100

    [BEAM-5197] Fix UnboundedSourceWrapper#testWatermarkEmission (#7138)
    
    * Avoid holding checkpoint lock in UnboundedSourceWrapper#testWatermarkEmission
    * Test emitting multiple watermarks in UnboundedSourceWrapper#testWatermarkEmission
---
 .../flink/streaming/TestCountingSource.java        | 15 +++-
 .../streaming/UnboundedSourceWrapperTest.java      | 80 ++++++++++++++--------
 2 files changed, 62 insertions(+), 33 deletions(-)

diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index e58907e..052cb04 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -55,6 +55,9 @@ public class TestCountingSource
   private final boolean throwOnFirstSnapshot;
   private final int fixedNumSplits;
 
+  /** Flag to stall processing readers' elements. */
+  private transient volatile boolean haltEmission;
+
   /**
    * We only allow an exception to be thrown from getCheckpointMark at most once. This must be
    * static since the entire TestCountingSource instance may re-serialized when the pipeline
@@ -106,8 +109,14 @@ public class TestCountingSource
     this.fixedNumSplits = fixedNumSplits;
   }
 
-  public int getShardNumber() {
-    return shardNumber;
+  /** Halts emission of elements until {@code continueEmission} is invoked. */
+  void haltEmission() {
+    haltEmission = true;
+  }
+
+  /** Continues processing elements after {@code haltEmission} was invoked. */
+  void continueEmission() {
+    haltEmission = false;
   }
 
   @Override
@@ -163,7 +172,7 @@ public class TestCountingSource
 
     @Override
     public boolean advance() {
-      if (current >= numMessagesPerShard - 1) {
+      if (current >= numMessagesPerShard - 1 || haltEmission) {
         return false;
       }
       // If testing dedup, occasionally insert a duplicate value;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 28589d5..cd4fe51 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -31,7 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -57,9 +57,8 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
-import org.joda.time.Instant;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
@@ -68,6 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link UnboundedSourceWrapper}. */
+@RunWith(Enclosed.class)
 public class UnboundedSourceWrapperTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapperTest.class);
@@ -216,15 +216,13 @@ public class UnboundedSourceWrapperTest {
 
     /**
      * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. If
-     * numSplits > numTasks the source has one source will manage multiple readers.
+     * numSplits > numTasks the source will manage multiple readers.
      *
-     * <p>This test verifies that watermark are correctly forwarded.
+     * <p>This test verifies that watermarks are correctly forwarded.
      */
     @Test(timeout = 30_000)
-    @Ignore("https://issues.apache.org/jira/browse/BEAM-5197") // deadlock on some platforms
     public void testWatermarkEmission() throws Exception {
       final int numElements = 500;
-      final Object checkpointLock = new Object();
       PipelineOptions options = PipelineOptionsFactory.create();
 
       // this source will emit exactly NUM_ELEMENTS across all parallel readers,
@@ -249,32 +247,36 @@ public class UnboundedSourceWrapperTest {
                   numTasks /* max parallelism */,
                   numTasks /* parallelism */,
                   0 /* subtask index */);
+      testHarness.getExecutionConfig().setLatencyTrackingInterval(0);
+      testHarness.getExecutionConfig().setAutoWatermarkInterval(1);
 
-      testHarness.setProcessingTime(Instant.now().getMillis());
+      testHarness.setProcessingTime(Long.MIN_VALUE);
       testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
       final ConcurrentLinkedQueue<Object> caughtExceptions = new ConcurrentLinkedQueue<>();
 
-      // use the AtomicBoolean just for the set()/get() functionality for communicating
-      // with the outer Thread
-      final AtomicBoolean seenWatermark = new AtomicBoolean(false);
+      // We test emission of two watermarks here, one intermediate, one final
+      final CountDownLatch seenWatermarks = new CountDownLatch(2);
+      final int minElementsPerReader = numElements / numSplits;
+      final CountDownLatch minElementsCountdown = new CountDownLatch(minElementsPerReader);
+
+      // first halt the source to test auto watermark emission
+      source.haltEmission();
+      testHarness.open();
 
       Thread sourceThread =
           new Thread(
               () -> {
                 try {
-                  testHarness.open();
                   sourceOperator.run(
-                      checkpointLock,
+                      testHarness.getCheckpointLock(),
                       new TestStreamStatusMaintainer(),
                       new Output<
                           StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
 
                         @Override
                         public void emitWatermark(Watermark watermark) {
-                          if (watermark.getTimestamp() >= numElements / 2) {
-                            seenWatermark.set(true);
-                          }
+                          seenWatermarks.countDown();
                         }
 
                         @Override
@@ -287,7 +289,9 @@ public class UnboundedSourceWrapperTest {
                         @Override
                         public void collect(
                             StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
-                                windowedValueStreamRecord) {}
+                                windowedValueStreamRecord) {
+                          minElementsCountdown.countDown();
+                        }
 
                         @Override
                         public void close() {}
@@ -300,21 +304,37 @@ public class UnboundedSourceWrapperTest {
 
       sourceThread.start();
 
-      while (true) {
-        if (!caughtExceptions.isEmpty()) {
-          fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
-        }
-        if (seenWatermark.get()) {
-          break;
-        }
+      while (flinkWrapper
+          .getLocalReaders()
+          .stream()
+          .anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
+        // readers haven't been initialized
         Thread.sleep(50);
+      }
 
-        // Need to advance this so that the watermark timers in the source wrapper fire
-        // Synchronize is necessary because this can interfere with updating the PriorityQueue
-        // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
-        synchronized (checkpointLock) {
-          testHarness.setProcessingTime(Instant.now().getMillis());
-        }
+      // Need to advance this so that the watermark timers in the source wrapper fire
+      // Synchronize is necessary because this can interfere with updating the PriorityQueue
+      // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
+      synchronized (testHarness.getCheckpointLock()) {
+        testHarness.setProcessingTime(0);
+      }
+
+      // now read the elements
+      source.continueEmission();
+      // ..and await elements
+      minElementsCountdown.await();
+
+      // Need to advance this so that the watermark timers in the source wrapper fire
+      // Synchronize is necessary because this can interfere with updating the PriorityQueue
+      // of the ProcessingTimeService which is also accessed through UnboundedSourceWrapper.
+      synchronized (testHarness.getCheckpointLock()) {
+        testHarness.setProcessingTime(Long.MAX_VALUE);
+      }
+
+      seenWatermarks.await();
+
+      if (!caughtExceptions.isEmpty()) {
+        fail("Caught exception(s): " + Joiner.on(",").join(caughtExceptions));
       }
 
       sourceOperator.cancel();