You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/15 16:48:58 UTC

[beam] branch bigtable-cdc-feature-branch updated: Not including support for end time in Bigtable Change Stream connector (#25474)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
     new 331b40117ea Not including support for end time in Bigtable Change Stream connector (#25474)
331b40117ea is described below

commit 331b40117eac3981ea319265bad18739e1dce297
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Wed Feb 15 11:48:49 2023 -0500

    Not including support for end time in Bigtable Change Stream connector (#25474)
---
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 20 ++---------------
 .../changestreams/action/ActionFactory.java        | 10 +++------
 .../action/DetectNewPartitionsAction.java          | 16 -------------
 .../action/GenerateInitialPartitionsAction.java    |  9 ++------
 .../action/ReadChangeStreamPartitionAction.java    | 26 ++++++----------------
 .../changestreams/dao/ChangeStreamDao.java         |  8 +------
 .../dofn/DetectNewPartitionsDoFn.java              | 12 +++-------
 .../changestreams/model/PartitionRecord.java       | 25 ++-------------------
 .../action/DetectNewPartitionsActionTest.java      |  5 +----
 .../GenerateInitialPartitionsActionTest.java       |  4 +---
 10 files changed, 22 insertions(+), 113 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d2eff9cbbfa..00048b73d08 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -222,8 +222,7 @@ import org.slf4j.LoggerFactory;
  *            .withInstanceId(instanceId)
  *            .withTableId(tableId)
  *            .withAppProfileId(appProfileId)
- *            .withStartTime(startTime)
- *            .withEndTime(endTime));
+ *            .withStartTime(startTime));
  * }</pre>
  *
  * <h3>Permissions</h3>
@@ -281,7 +280,6 @@ public class BigtableIO {
    *
    * <ul>
    *   <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to now.
-   *   <li>{@link BigtableIO.ReadChangeStream#withEndTime} which defaults to empty.
    *   <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with defaults to 1 seconds.
    *   <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} which defaults to value
    *       from {@link BigtableIO.ReadChangeStream#withProjectId}
@@ -1544,8 +1542,6 @@ public class BigtableIO {
 
     abstract @Nullable Timestamp getStartTime();
 
-    abstract @Nullable Timestamp getEndTime();
-
     abstract @Nullable Duration getHeartbeatDuration();
 
     abstract @Nullable String getChangeStreamName();
@@ -1657,16 +1653,6 @@ public class BigtableIO {
       return toBuilder().setStartTime(startTime).build();
     }
 
-    /**
-     * Returns a new {@link BigtableIO.ReadChangeStream} that will stop streaming at the specified
-     * end time.
-     *
-     * <p>Does not modify this object.
-     */
-    public ReadChangeStream withEndTime(Timestamp endTime) {
-      return toBuilder().setEndTime(endTime).build();
-    }
-
     /**
      * Returns a new {@link BigtableIO.ReadChangeStream} that will send heartbeat messages at
      * specified interval.
@@ -1800,7 +1786,7 @@ public class BigtableIO {
       InitializeDoFn initializeDoFn =
           new InitializeDoFn(daoFactory, metadataTableConfig.getAppProfileId().get(), startTime);
       DetectNewPartitionsDoFn detectNewPartitionsDoFn =
-          new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, metrics);
+          new DetectNewPartitionsDoFn(actionFactory, daoFactory, metrics);
       ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
           new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics);
 
@@ -1821,8 +1807,6 @@ public class BigtableIO {
 
       abstract ReadChangeStream.Builder setStartTime(Timestamp startTime);
 
-      abstract ReadChangeStream.Builder setEndTime(Timestamp endTime);
-
       abstract ReadChangeStream.Builder setHeartbeatDuration(Duration interval);
 
       abstract ReadChangeStream.Builder setChangeStreamName(String changeStreamName);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
index 18fbc5fe404..cfc5b5fd3b2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
@@ -17,9 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
-import com.google.cloud.Timestamp;
 import java.io.Serializable;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
@@ -66,12 +64,10 @@ public class ActionFactory implements Serializable {
   public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
       ChangeStreamMetrics metrics,
       MetadataTableDao metadataTableDao,
-      @Nullable Timestamp endTime,
       GenerateInitialPartitionsAction generateInitialPartitionsAction) {
     if (detectNewPartitionsAction == null) {
       detectNewPartitionsAction =
-          new DetectNewPartitionsAction(
-              metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+          new DetectNewPartitionsAction(metrics, metadataTableDao, generateInitialPartitionsAction);
     }
     return detectNewPartitionsAction;
   }
@@ -85,10 +81,10 @@ public class ActionFactory implements Serializable {
    * @return singleton instance of the {@link GenerateInitialPartitionsAction}
    */
   public synchronized GenerateInitialPartitionsAction generateInitialPartitionsAction(
-      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) {
+      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
     if (generateInitialPartitionsAction == null) {
       generateInitialPartitionsAction =
-          new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+          new GenerateInitialPartitionsAction(metrics, changeStreamDao);
     }
     return generateInitialPartitionsAction;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 932796d4ba8..4e286fe6397 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -19,9 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
 import com.google.cloud.Timestamp;
 import com.google.protobuf.InvalidProtocolBufferException;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.range.OffsetRange;
@@ -55,17 +53,14 @@ public class DetectNewPartitionsAction {
 
   private final ChangeStreamMetrics metrics;
   private final MetadataTableDao metadataTableDao;
-  @Nullable private final com.google.cloud.Timestamp endTime;
   private final GenerateInitialPartitionsAction generateInitialPartitionsAction;
 
   public DetectNewPartitionsAction(
       ChangeStreamMetrics metrics,
       MetadataTableDao metadataTableDao,
-      @Nullable Timestamp endTime,
       GenerateInitialPartitionsAction generateInitialPartitionsAction) {
     this.metrics = metrics;
     this.metadataTableDao = metadataTableDao;
-    this.endTime = endTime;
     this.generateInitialPartitionsAction = generateInitialPartitionsAction;
   }
 
@@ -77,7 +72,6 @@ public class DetectNewPartitionsAction {
    *   <li>Look up the initial list of partitions to stream if it's the very first run.
    *   <li>On rest of the runs, try advancing watermark if needed.
    *   <li>Update the metadata table with info about this DoFn.
-   *   <li>Check if this pipeline has reached the end time. Terminate if it has.
    *   <li>Process new partitions and output them.
    *   <li>Register callback to clean up processed partitions after bundle has been finalized.
    * </ol>
@@ -103,16 +97,6 @@ public class DetectNewPartitionsAction {
       return generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime);
     }
 
-    // Terminate if endTime <= watermark that means all partitions have read up to or beyond
-    // watermark. We no longer need to manage splits and merges, we can terminate.
-    if (endTime != null
-        && endTime.compareTo(
-                TimestampConverter.toCloudTimestamp(watermarkEstimator.currentWatermark()))
-            <= 0) {
-      tracker.tryClaim(tracker.currentRestriction().getTo());
-      return ProcessContinuation.stop();
-    }
-
     if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
       return ProcessContinuation.stop();
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index 164a679a26f..938464f500a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
-import com.google.cloud.Timestamp;
 import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
@@ -44,13 +42,11 @@ public class GenerateInitialPartitionsAction {
 
   private final ChangeStreamMetrics metrics;
   private final ChangeStreamDao changeStreamDao;
-  @Nullable private final Timestamp endTime;
 
   public GenerateInitialPartitionsAction(
-      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) {
+      ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
     this.metrics = metrics;
     this.changeStreamDao = changeStreamDao;
-    this.endTime = endTime;
   }
 
   /**
@@ -78,8 +74,7 @@ public class GenerateInitialPartitionsAction {
     for (ByteStringRange partition : streamPartitions) {
       metrics.incListPartitionsCount();
       String uid = UniqueIdGenerator.getNextId();
-      PartitionRecord partitionRecord =
-          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      PartitionRecord partitionRecord = new PartitionRecord(partition, startTime, uid, startTime);
       // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
       // limits the ability to window on commit time of any data changes. It is still possible to
       // window on processing time.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 321a3c1a1ca..a36b57ff42f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -85,20 +85,12 @@ public class ReadChangeStreamPartitionAction {
    *   <li>Process CloseStream if it exists. In order to solve a possible inconsistent state
    *       problem, we do not process CloseStream after receiving it. We claim the CloseStream in
    *       the RestrictionTracker so it persists after a checkpoint. We checkpoint to flush all the
-   *       DataChanges. Then on resume, we process the CloseStream. There are only 2 expected Status
-   *       for CloseStream: OK and Out of Range.
-   *       <ol>
-   *         <li>OK status is returned when the predetermined endTime has been reached. In this
-   *             case, we update the watermark and update the metadata table. {@link
-   *             DetectNewPartitionsDoFn} aggregates the watermark from all the streams to ensure
-   *             all the streams have reached beyond endTime so it can also terminate and end the
-   *             beam job.
-   *         <li>Out of Range is returned when the partition has either been split into more
-   *             partitions or merged into a larger partition. In this case, we write to the
-   *             metadata table the new partitions' information so that {@link
-   *             DetectNewPartitionsDoFn} can read and output those new partitions to be streamed.
-   *             We also need to ensure we clean up this partition's metadata to release the lock.
-   *       </ol>
+   *       DataChanges. Then on resume, we process the CloseStream. There is only 1 expected Status
+   *       for CloseStream: Out of Range. Out of Range is returned when the partition has either
+   *       been split into more partitions or merged into a larger partition. In this case, we write
+   *       to the metadata table the new partitions' information so that {@link
+   *       DetectNewPartitionsDoFn} can read and output those new partitions to be streamed. We also
+   *       need to ensure we clean up this partition's metadata to release the lock.
    *   <li>Update the metadata table with the watermark and additional debugging info.
    *   <li>Stream the partition.
    * </ol>
@@ -145,11 +137,7 @@ public class ReadChangeStreamPartitionAction {
     try {
       stream =
           changeStreamDao.readChangeStreamPartition(
-              partitionRecord,
-              tracker.currentRestriction(),
-              partitionRecord.getEndTime(),
-              heartbeatDurationSeconds,
-              shouldDebug);
+              partitionRecord, tracker.currentRestriction(), heartbeatDurationSeconds, shouldDebug);
       for (ChangeStreamRecord record : stream) {
         Optional<ProcessContinuation> result =
             changeStreamAction.run(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98902866cb1..3cbd0fb7902 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -31,7 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,8 +61,7 @@ public class ChangeStreamDao {
    * Streams a partition.
    *
    * @param partition the partition to stream
-   * @param streamProgress may contain a continuation token for the stream request
-   * @param endTime time to end the stream, may be null
+   * @param streamProgress may contain a continuation token for the stream request\
    * @param heartbeatDurationSeconds period between heartbeat messages
    * @return stream of ReadChangeStreamResponse
    * @throws IOException if the stream could not be started
@@ -71,7 +69,6 @@ public class ChangeStreamDao {
   public ServerStream<ChangeStreamRecord> readChangeStreamPartition(
       PartitionRecord partition,
       StreamProgress streamProgress,
-      @Nullable Timestamp endTime,
       Duration heartbeatDurationSeconds,
       boolean shouldDebug)
       throws IOException {
@@ -92,9 +89,6 @@ public class ChangeStreamDao {
     } else {
       throw new IOException("Something went wrong");
     }
-    if (endTime != null) {
-      query.endTime(endTime.toProto());
-    }
     query.heartbeatDuration(heartbeatDurationSeconds.getStandardSeconds());
     if (shouldDebug) {
       LOG.info(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
index 6e8f21cd00b..12f95ac160d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
 
 import java.io.IOException;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
@@ -42,7 +41,6 @@ import org.joda.time.Instant;
 @UnboundedPerElement
 public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, PartitionRecord> {
   private static final long serialVersionUID = 8052524268978107367L;
-  @Nullable private final com.google.cloud.Timestamp endTime;
 
   private final DaoFactory daoFactory;
   private final ChangeStreamMetrics metrics;
@@ -50,13 +48,9 @@ public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, Pa
   private DetectNewPartitionsAction detectNewPartitionsAction;
 
   public DetectNewPartitionsDoFn(
-      @Nullable com.google.cloud.Timestamp endTime,
-      ActionFactory actionFactory,
-      DaoFactory daoFactory,
-      ChangeStreamMetrics metrics) {
+      ActionFactory actionFactory, DaoFactory daoFactory, ChangeStreamMetrics metrics) {
     this.actionFactory = actionFactory;
     this.daoFactory = daoFactory;
-    this.endTime = endTime;
     this.metrics = metrics;
   }
 
@@ -92,10 +86,10 @@ public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, Pa
     final MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao();
     final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
     GenerateInitialPartitionsAction generateInitialPartitionsAction =
-        actionFactory.generateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+        actionFactory.generateInitialPartitionsAction(metrics, changeStreamDao);
     detectNewPartitionsAction =
         actionFactory.detectNewPartitionsAction(
-            metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+            metrics, metadataTableDao, generateInitialPartitionsAction);
   }
 
   @ProcessElement
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
index 15ebf4c8fd6..c94011c8f7a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
@@ -40,43 +40,26 @@ public class PartitionRecord implements Serializable {
   private ByteStringRange partition;
   @Nullable private Timestamp startTime;
   @Nullable private List<ChangeStreamContinuationToken> changeStreamContinuationTokens;
-  @Nullable private Timestamp endTime;
   private String uuid;
   private Timestamp parentLowWatermark;
 
   public PartitionRecord(
-      ByteStringRange partition,
-      Timestamp startTime,
-      String uuid,
-      Timestamp parentLowWatermark,
-      @Nullable Timestamp endTime) {
+      ByteStringRange partition, Timestamp startTime, String uuid, Timestamp parentLowWatermark) {
     this.partition = partition;
     this.startTime = startTime;
     this.uuid = uuid;
     this.parentLowWatermark = parentLowWatermark;
-    this.endTime = endTime;
   }
 
   public PartitionRecord(
       ByteStringRange partition,
       List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
       String uuid,
-      Timestamp parentLowWatermark,
-      @Nullable Timestamp endTime) {
+      Timestamp parentLowWatermark) {
     this.partition = partition;
     this.changeStreamContinuationTokens = changeStreamContinuationTokens;
     this.uuid = uuid;
     this.parentLowWatermark = parentLowWatermark;
-    this.endTime = endTime;
-  }
-
-  @Nullable
-  public Timestamp getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(@Nullable Timestamp endTime) {
-    this.endTime = endTime;
   }
 
   @Nullable
@@ -135,7 +118,6 @@ public class PartitionRecord implements Serializable {
         && Objects.equals(getStartTime(), that.getStartTime())
         && Objects.equals(
             getChangeStreamContinuationTokens(), that.getChangeStreamContinuationTokens())
-        && Objects.equals(getEndTime(), that.getEndTime())
         && getUuid().equals(that.getUuid())
         && Objects.equals(getParentLowWatermark(), that.getParentLowWatermark());
   }
@@ -146,7 +128,6 @@ public class PartitionRecord implements Serializable {
         getPartition(),
         getStartTime(),
         getChangeStreamContinuationTokens(),
-        getEndTime(),
         getUuid(),
         getParentLowWatermark());
   }
@@ -160,8 +141,6 @@ public class PartitionRecord implements Serializable {
         + startTime
         + ", changeStreamContinuationTokens="
         + changeStreamContinuationTokens
-        + ", endTime="
-        + endTime
         + ", uuid='"
         + uuid
         + '\''
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
index b3f86c9be10..c8d88f2af75 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -68,7 +68,6 @@ public class DetectNewPartitionsActionTest {
   private MetadataTableDao metadataTableDao;
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
   private Timestamp startTime;
-  private Timestamp endTime;
   private static BigtableDataClient dataClient;
   private static BigtableTableAdminClient adminClient;
 
@@ -104,10 +103,8 @@ public class DetectNewPartitionsActionTest {
             metadataTableAdminDao.getChangeStreamNamePrefix());
 
     startTime = Timestamp.now();
-    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, startTime.getNanos());
     action =
-        new DetectNewPartitionsAction(
-            metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+        new DetectNewPartitionsAction(metrics, metadataTableDao, generateInitialPartitionsAction);
     watermarkEstimator = new WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
index c407ec70e63..eddc68a3c3f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -66,7 +66,6 @@ public class GenerateInitialPartitionsActionTest {
 
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
   private Timestamp startTime;
-  private Timestamp endTime;
   private static BigtableTableAdminClient adminClient;
 
   @Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
@@ -89,7 +88,6 @@ public class GenerateInitialPartitionsActionTest {
             adminClient, null, changeStreamId, MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
     metadataTableAdminDao.createMetadataTable();
     startTime = Timestamp.now();
-    endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, startTime.getNanos());
     watermarkEstimator = new WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
   }
 
@@ -103,7 +101,7 @@ public class GenerateInitialPartitionsActionTest {
     when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
 
     GenerateInitialPartitionsAction generateInitialPartitionsAction =
-        new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+        new GenerateInitialPartitionsAction(metrics, changeStreamDao);
     assertEquals(
         ProcessContinuation.resume(),
         generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime));