You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/23 16:41:02 UTC
[beam] branch master updated: [BEAM-9565] Fix threading issue with
WatermarkEstimatorsTest
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a28d800 [BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
new 7310ec2 Merge pull request #11190 from lukecwik/beam9565
a28d800 is described below
commit a28d80058a655ebf048082f8ac23b3cd1919d127
Author: Luke Cwik <lc...@google.com>
AuthorDate: Sun Mar 22 08:41:02 2020 -0700
[BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
The issue was that the thread was calling an update method that wasn't going through the wrapper object which was providing synchronization.
---
.../fn/splittabledofn/WatermarkEstimatorsTest.java | 36 ++++++++--------------
1 file changed, 13 insertions(+), 23 deletions(-)
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
index 3e12c7e..6e615f1 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
-import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -35,46 +34,40 @@ import org.junit.runners.JUnit4;
/** Tests for {@link WatermarkEstimators}. */
@RunWith(JUnit4.class)
public class WatermarkEstimatorsTest {
-
@Test
public void testThreadSafeWatermarkEstimator() throws Exception {
Instant[] reference = new Instant[] {GlobalWindow.TIMESTAMP_MIN_VALUE};
WatermarkEstimator<Instant> watermarkEstimator =
new WatermarkEstimator<Instant>() {
+ private Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;
@Override
public Instant currentWatermark() {
- return reference[0];
+ currentWatermark = reference[0];
+ return currentWatermark;
}
@Override
public Instant getState() {
- return reference[0];
+ return currentWatermark;
}
};
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
- watermarkEstimator, (instant) -> reference[0] = instant);
- }
-
- @Test
- public void testThreadSafeManualWatermarkEstimator() throws Exception {
- ManualWatermarkEstimator<Instant> watermarkEstimator =
- new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual(
- GlobalWindow.TIMESTAMP_MIN_VALUE);
- testWatermarkEstimatorSnapshotsStateWithCompetingThread(
- watermarkEstimator, watermarkEstimator::setWatermark);
+ WatermarkEstimators.threadSafe(watermarkEstimator), (instant) -> reference[0] = instant);
}
@Test
public void testThreadSafeTimestampObservingWatermarkEstimator() throws Exception {
- TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
- new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
- .MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
+ WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
+ WatermarkEstimators.threadSafe(
+ new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
+ .MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
- watermarkEstimator, watermarkEstimator::observeTimestamp);
+ threadsafeWatermarkEstimator,
+ ((TimestampObservingWatermarkEstimator) threadsafeWatermarkEstimator)::observeTimestamp);
}
- public <WatermarkEstimatorT extends WatermarkEstimator<Instant>>
+ public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>>
void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
WatermarkEstimatorT watermarkEstimator, Consumer<Instant> watermarkUpdater)
throws Exception {
@@ -89,14 +82,11 @@ public class WatermarkEstimatorsTest {
});
t.start();
- WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
- WatermarkEstimators.threadSafe(watermarkEstimator);
-
// Ensure the thread has started before we start fetching values.
countDownLatch.await();
Instant currentMinimum = GlobalWindow.TIMESTAMP_MIN_VALUE;
for (int i = 0; i < 100; ++i) {
- KV<Instant, Instant> value = threadsafeWatermarkEstimator.getWatermarkAndState();
+ KV<Instant, Instant> value = watermarkEstimator.getWatermarkAndState();
// The watermark estimators that we use ensure that state == current watermark so test that
// they are equal here
assertEquals(value.getKey(), value.getValue());