You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2022/04/27 12:26:36 UTC
[beam] branch master updated: [BEAM-14363] Fixes WatermarkParameters builder for Kinesis
This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86eeb03536e [BEAM-14363] Fixes WatermarkParameters builder for Kinesis
new 64dc9c62dce Merge pull request #17458 from nickcaballero/fix-kinesis-watermark-parameters
86eeb03536e is described below
commit 86eeb03536e4dc3a2855064103da09e2a2e79a63
Author: Nick Caballero <ni...@offerup.com>
AuthorDate: Mon Apr 25 12:17:13 2022 -0400
[BEAM-14363] Fixes WatermarkParameters builder for Kinesis
---
.../beam/sdk/io/aws2/kinesis/WatermarkParameters.java | 4 ++--
.../beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java | 14 ++++++++++++++
.../apache/beam/sdk/io/kinesis/WatermarkParameters.java | 4 ++--
.../apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java | 14 ++++++++++++++
4 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
index 520fd374703..55c36d1128b 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements Serializable {
public WatermarkParameters withTimestampFn(
SerializableFunction<KinesisRecord, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn function is null");
- return builder().setTimestampFn(timestampFn).build();
+ return toBuilder().setTimestampFn(timestampFn).build();
}
/**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements Serializable {
*/
public WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
checkArgument(idleDurationThreshold != null, "watermark idle duration threshold is null");
- return builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+ return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
index 896e18e31ae..a1c8adffb3d 100644
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.java
@@ -147,4 +147,18 @@ public class WatermarkPolicyTest {
policy.update(b);
assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
+
+ @Test
+ public void shouldUpdateWatermarkParameters() {
+ SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+ Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+ WatermarkParameters parameters =
+ WatermarkParameters.create()
+ .withTimestampFn(fn)
+ .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+ assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+ assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+ }
}
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
index 704e1ff0ef6..1e9ca2174a7 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/WatermarkParameters.java
@@ -84,7 +84,7 @@ public abstract class WatermarkParameters implements Serializable {
public WatermarkParameters withTimestampFn(
SerializableFunction<KinesisRecord, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn function is null");
- return builder().setTimestampFn(timestampFn).build();
+ return toBuilder().setTimestampFn(timestampFn).build();
}
/**
@@ -93,6 +93,6 @@ public abstract class WatermarkParameters implements Serializable {
*/
public WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
checkArgument(idleDurationThreshold != null, "watermark idle duration threshold is null");
- return builder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
+ return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
index 11dd07dc9a7..ce5c555a4df 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.java
@@ -149,4 +149,18 @@ public class WatermarkPolicyTest {
policy.update(b);
assertThat(policy.getWatermark()).isEqualTo(time2.plus(Duration.standardMinutes(1)));
}
+
+ @Test
+ public void shouldUpdateWatermarkParameters() {
+ SerializableFunction<KinesisRecord, Instant> fn = input -> Instant.now();
+ Duration idleDurationThreshold = Duration.standardSeconds(30);
+
+ WatermarkParameters parameters =
+ WatermarkParameters.create()
+ .withTimestampFn(fn)
+ .withWatermarkIdleDurationThreshold(idleDurationThreshold);
+
+ assertThat(parameters.getTimestampFn()).isEqualTo(fn);
+ assertThat(parameters.getWatermarkIdleDurationThreshold()).isEqualTo(idleDurationThreshold);
+ }
}