You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/04/27 12:26:36 UTC

[beam] branch master updated: [BEAM-14363] Fixes WatermarkParameters builder for Kinesis

This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 86eeb03536e [BEAM-14363] Fixes WatermarkParameters builder for Kinesis
     new 64dc9c62dce Merge pull request #17458 from nickcaballero/fix-kinesis-watermark-parameters
86eeb03536e is described below

commit 86eeb03536e4dc3a2855064103da09e2a2e79a63
Author: Nick Caballero <ni...@offerup.com>
AuthorDate: Mon Apr 25 12:17:13 2022 -0400

    [BEAM-14363] Fixes WatermarkParameters builder for Kinesis
---
 .../beam/sdk/io/aws2/kinesis/WatermarkParameters.java      |  4 ++--
 .../beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java      | 14 ++++++++++++++
 .../apache/beam/sdk/io/kinesis/WatermarkParameters.java    |  4 ++--
 .../apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java    | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
index 520fd374703..55c36d1128b 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements Serializable {
   public WatermarkParameters withTimestampFn(
       SerializableFunction<KinesisRecord, Instant> timestampFn) {
     checkArgument(timestampFn != null, "timestampFn function is null");
-    return builder().setTimestampFn(timestampFn).build();
+    return toBuilder().setTimestampFn(timestampFn).build();
   }
 
   /**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements Serializable {
    */
   public WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
     checkArgument(idleDurationThreshold != null, "watermark idle duration threshold is null");
-    return builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+    return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
   }
 }
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
index 896e18e31ae..a1c8adffb3d 100644
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
@@ -147,4 +147,18 @@ public class WatermarkPolicyTest {
     policy.update(b);
     assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
   }
+
+  @Test
+  public void shouldUpdateWatermarkParameters() {
+    SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+    Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+    WatermarkParameters parameters =
+        WatermarkParameters.create()
+            .withTimestampFn(fn)
+            .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+    assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+    assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+  }
 }
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
index 704e1ff0ef6..1e9ca2174a7 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements Serializable {
   public WatermarkParameters withTimestampFn(
       SerializableFunction<KinesisRecord, Instant> timestampFn) {
     checkArgument(timestampFn != null, "timestampFn function is null");
-    return builder().setTimestampFn(timestampFn).build();
+    return toBuilder().setTimestampFn(timestampFn).build();
   }
 
   /**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements Serializable {
    */
   public WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
     checkArgument(idleDurationThreshold != null, "watermark idle duration threshold is null");
-    return builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+    return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
   }
 }
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
index 11dd07dc9a7..ce5c555a4df 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
@@ -149,4 +149,18 @@ public class WatermarkPolicyTest {
     policy.update(b);
     assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
   }
+
+  @Test
+  public void shouldUpdateWatermarkParameters() {
+    SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+    Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+    WatermarkParameters parameters =
+        WatermarkParameters.create()
+            .withTimestampFn(fn)
+            .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+    assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+    assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+  }
 }