You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/23 16:41:02 UTC

[beam] branch master updated: [BEAM-9565] Fix threading issue with WatermarkEstimatorsTest

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a28d800  [BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
     new 7310ec2  Merge pull request #11190 from lukecwik/beam9565
a28d800 is described below

commit a28d80058a655ebf048082f8ac23b3cd1919d127
Author: Luke Cwik <lc...@google.com>
AuthorDate: Sun Mar 22 08:41:02 2020 -0700

    [BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
    
    The issue was that the thread was calling an update method that wasn't going through the wrapper object which was providing synchronization.
---
 .../fn/splittabledofn/WatermarkEstimatorsTest.java | 36 ++++++++--------------
 1 file changed, 13 insertions(+), 23 deletions(-)

diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
index 3e12c7e..6e615f1 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
-import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -35,46 +34,40 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link WatermarkEstimators}. */
 @RunWith(JUnit4.class)
 public class WatermarkEstimatorsTest {
-
   @Test
   public void testThreadSafeWatermarkEstimator() throws Exception {
     Instant[] reference = new Instant[] {GlobalWindow.TIMESTAMP_MIN_VALUE};
     WatermarkEstimator<Instant> watermarkEstimator =
         new WatermarkEstimator<Instant>() {
+          private Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;
 
           @Override
           public Instant currentWatermark() {
-            return reference[0];
+            currentWatermark = reference[0];
+            return currentWatermark;
           }
 
           @Override
           public Instant getState() {
-            return reference[0];
+            return currentWatermark;
           }
         };
     testWatermarkEstimatorSnapshotsStateWithCompetingThread(
-        watermarkEstimator, (instant) -> reference[0] = instant);
-  }
-
-  @Test
-  public void testThreadSafeManualWatermarkEstimator() throws Exception {
-    ManualWatermarkEstimator<Instant> watermarkEstimator =
-        new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual(
-            GlobalWindow.TIMESTAMP_MIN_VALUE);
-    testWatermarkEstimatorSnapshotsStateWithCompetingThread(
-        watermarkEstimator, watermarkEstimator::setWatermark);
+        WatermarkEstimators.threadSafe(watermarkEstimator), (instant) -> reference[0] = instant);
   }
 
   @Test
   public void testThreadSafeTimestampObservingWatermarkEstimator() throws Exception {
-    TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
-        new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
-            .MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
+        WatermarkEstimators.threadSafe(
+            new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
+                .MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
     testWatermarkEstimatorSnapshotsStateWithCompetingThread(
-        watermarkEstimator, watermarkEstimator::observeTimestamp);
+        threadsafeWatermarkEstimator,
+        ((TimestampObservingWatermarkEstimator) threadsafeWatermarkEstimator)::observeTimestamp);
   }
 
-  public <WatermarkEstimatorT extends WatermarkEstimator<Instant>>
+  public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>>
       void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
           WatermarkEstimatorT watermarkEstimator, Consumer<Instant> watermarkUpdater)
           throws Exception {
@@ -89,14 +82,11 @@ public class WatermarkEstimatorsTest {
             });
     t.start();
 
-    WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
-        WatermarkEstimators.threadSafe(watermarkEstimator);
-
     // Ensure the thread has started before we start fetching values.
     countDownLatch.await();
     Instant currentMinimum = GlobalWindow.TIMESTAMP_MIN_VALUE;
     for (int i = 0; i < 100; ++i) {
-      KV<Instant, Instant> value = threadsafeWatermarkEstimator.getWatermarkAndState();
+      KV<Instant, Instant> value = watermarkEstimator.getWatermarkAndState();
       // The watermark estimators that we use ensure that state == current watermark so test that
       // they are equal here
       assertEquals(value.getKey(), value.getValue());