You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/05/31 19:55:07 UTC

[beam] branch master updated: Return unknown backlog on Kinesis if reader not started (#26953)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 205cbcea9b0 Return unknown backlog on Kinesis if reader not started (#26953)
205cbcea9b0 is described below

commit 205cbcea9b030ebd5c2dc6bd6479c5c381248aab
Author: Zachary Houfek <83...@users.noreply.github.com>
AuthorDate: Wed May 31 15:54:59 2023 -0400

    Return unknown backlog on Kinesis if reader not started (#26953)
---
 CHANGES.md                                                          | 1 +
 .../java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java     | 6 ++++++
 .../java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java | 6 ++++++
 3 files changed, 13 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index 79fdcaf7f19..af9855057d6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
 ## Bugfixes
 
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* Fixed KinesisIO `NullPointerException` when a progress check is made before the reader is started (IO) ([#23868](https://github.com/apache/beam/issues/23868))
 
 ## Known Issues
 
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java
index 77184ba5ab8..1ca37d23cd8 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java
@@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.IOException;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Duration;
@@ -159,6 +160,11 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
    */
   @Override
   public long getSplitBacklogBytes() {
+    // Safety check in case a progress check is made for the start method is called.
+    if (shardReadersPool == null) {
+      return UnboundedReader.BACKLOG_UNKNOWN;
+    }
+
     Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp();
 
     if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java
index 7674b66f329..c063e817244 100644
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -150,6 +151,11 @@ public class KinesisReaderTest {
     assertThat(reader.getSplitBacklogBytes()).isEqualTo(20);
   }
 
+  @Test
+  public void getSplitBacklogBytesShouldReturnUnknownIfNotStarted() {
+    assertThat(reader.getSplitBacklogBytes()).isEqualTo(UnboundedReader.BACKLOG_UNKNOWN);
+  }
+
   @Test
   public void getSplitBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently()
       throws TransientKinesisException, IOException {