You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/15 16:48:58 UTC
[beam] branch bigtable-cdc-feature-branch updated: Not including support for end time in Bigtable Change Stream connector (#25474)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
new 331b40117ea Not including support for end time in Bigtable Change Stream connector (#25474)
331b40117ea is described below
commit 331b40117eac3981ea319265bad18739e1dce297
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Wed Feb 15 11:48:49 2023 -0500
Not including support for end time in Bigtable Change Stream connector (#25474)
---
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 20 ++---------------
.../changestreams/action/ActionFactory.java | 10 +++------
.../action/DetectNewPartitionsAction.java | 16 -------------
.../action/GenerateInitialPartitionsAction.java | 9 ++------
.../action/ReadChangeStreamPartitionAction.java | 26 ++++++----------------
.../changestreams/dao/ChangeStreamDao.java | 8 +------
.../dofn/DetectNewPartitionsDoFn.java | 12 +++-------
.../changestreams/model/PartitionRecord.java | 25 ++-------------------
.../action/DetectNewPartitionsActionTest.java | 5 +----
.../GenerateInitialPartitionsActionTest.java | 4 +---
10 files changed, 22 insertions(+), 113 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d2eff9cbbfa..00048b73d08 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -222,8 +222,7 @@ import org.slf4j.LoggerFactory;
* .withInstanceId(instanceId)
* .withTableId(tableId)
* .withAppProfileId(appProfileId)
- * .withStartTime(startTime)
- * .withEndTime(endTime));
+ * .withStartTime(startTime));
* }</pre>
*
* <h3>Permissions</h3>
@@ -281,7 +280,6 @@ public class BigtableIO {
*
* <ul>
* <li>{@link BigtableIO.ReadChangeStream#withStartTime} which defaults to now.
- * <li>{@link BigtableIO.ReadChangeStream#withEndTime} which defaults to empty.
* <li>{@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with defaults to 1 seconds.
* <li>{@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} which defaults to value
* from {@link BigtableIO.ReadChangeStream#withProjectId}
@@ -1544,8 +1542,6 @@ public class BigtableIO {
abstract @Nullable Timestamp getStartTime();
- abstract @Nullable Timestamp getEndTime();
-
abstract @Nullable Duration getHeartbeatDuration();
abstract @Nullable String getChangeStreamName();
@@ -1657,16 +1653,6 @@ public class BigtableIO {
return toBuilder().setStartTime(startTime).build();
}
- /**
- * Returns a new {@link BigtableIO.ReadChangeStream} that will stop streaming at the specified
- * end time.
- *
- * <p>Does not modify this object.
- */
- public ReadChangeStream withEndTime(Timestamp endTime) {
- return toBuilder().setEndTime(endTime).build();
- }
-
/**
* Returns a new {@link BigtableIO.ReadChangeStream} that will send heartbeat messages at
* specified interval.
@@ -1800,7 +1786,7 @@ public class BigtableIO {
InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, metadataTableConfig.getAppProfileId().get(), startTime);
DetectNewPartitionsDoFn detectNewPartitionsDoFn =
- new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, metrics);
+ new DetectNewPartitionsDoFn(actionFactory, daoFactory, metrics);
ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics);
@@ -1821,8 +1807,6 @@ public class BigtableIO {
abstract ReadChangeStream.Builder setStartTime(Timestamp startTime);
- abstract ReadChangeStream.Builder setEndTime(Timestamp endTime);
-
abstract ReadChangeStream.Builder setHeartbeatDuration(Duration interval);
abstract ReadChangeStream.Builder setChangeStreamName(String changeStreamName);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
index 18fbc5fe404..cfc5b5fd3b2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java
@@ -17,9 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
-import com.google.cloud.Timestamp;
import java.io.Serializable;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
@@ -66,12 +64,10 @@ public class ActionFactory implements Serializable {
public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
ChangeStreamMetrics metrics,
MetadataTableDao metadataTableDao,
- @Nullable Timestamp endTime,
GenerateInitialPartitionsAction generateInitialPartitionsAction) {
if (detectNewPartitionsAction == null) {
detectNewPartitionsAction =
- new DetectNewPartitionsAction(
- metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+ new DetectNewPartitionsAction(metrics, metadataTableDao, generateInitialPartitionsAction);
}
return detectNewPartitionsAction;
}
@@ -85,10 +81,10 @@ public class ActionFactory implements Serializable {
* @return singleton instance of the {@link GenerateInitialPartitionsAction}
*/
public synchronized GenerateInitialPartitionsAction generateInitialPartitionsAction(
- ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) {
+ ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
if (generateInitialPartitionsAction == null) {
generateInitialPartitionsAction =
- new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+ new GenerateInitialPartitionsAction(metrics, changeStreamDao);
}
return generateInitialPartitionsAction;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
index 932796d4ba8..4e286fe6397 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java
@@ -19,9 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import com.google.cloud.Timestamp;
import com.google.protobuf.InvalidProtocolBufferException;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.range.OffsetRange;
@@ -55,17 +53,14 @@ public class DetectNewPartitionsAction {
private final ChangeStreamMetrics metrics;
private final MetadataTableDao metadataTableDao;
- @Nullable private final com.google.cloud.Timestamp endTime;
private final GenerateInitialPartitionsAction generateInitialPartitionsAction;
public DetectNewPartitionsAction(
ChangeStreamMetrics metrics,
MetadataTableDao metadataTableDao,
- @Nullable Timestamp endTime,
GenerateInitialPartitionsAction generateInitialPartitionsAction) {
this.metrics = metrics;
this.metadataTableDao = metadataTableDao;
- this.endTime = endTime;
this.generateInitialPartitionsAction = generateInitialPartitionsAction;
}
@@ -77,7 +72,6 @@ public class DetectNewPartitionsAction {
* <li>Look up the initial list of partitions to stream if it's the very first run.
* <li>On rest of the runs, try advancing watermark if needed.
* <li>Update the metadata table with info about this DoFn.
- * <li>Check if this pipeline has reached the end time. Terminate if it has.
* <li>Process new partitions and output them.
* <li>Register callback to clean up processed partitions after bundle has been finalized.
* </ol>
@@ -103,16 +97,6 @@ public class DetectNewPartitionsAction {
return generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime);
}
- // Terminate if endTime <= watermark that means all partitions have read up to or beyond
- // watermark. We no longer need to manage splits and merges, we can terminate.
- if (endTime != null
- && endTime.compareTo(
- TimestampConverter.toCloudTimestamp(watermarkEstimator.currentWatermark()))
- <= 0) {
- tracker.tryClaim(tracker.currentRestriction().getTo());
- return ProcessContinuation.stop();
- }
-
if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
return ProcessContinuation.stop();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
index 164a679a26f..938464f500a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
-import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
@@ -44,13 +42,11 @@ public class GenerateInitialPartitionsAction {
private final ChangeStreamMetrics metrics;
private final ChangeStreamDao changeStreamDao;
- @Nullable private final Timestamp endTime;
public GenerateInitialPartitionsAction(
- ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) {
+ ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao) {
this.metrics = metrics;
this.changeStreamDao = changeStreamDao;
- this.endTime = endTime;
}
/**
@@ -78,8 +74,7 @@ public class GenerateInitialPartitionsAction {
for (ByteStringRange partition : streamPartitions) {
metrics.incListPartitionsCount();
String uid = UniqueIdGenerator.getNextId();
- PartitionRecord partitionRecord =
- new PartitionRecord(partition, startTime, uid, startTime, endTime);
+ PartitionRecord partitionRecord = new PartitionRecord(partition, startTime, uid, startTime);
// We are outputting elements with timestamp of 0 to prevent reliance on event time. This
// limits the ability to window on commit time of any data changes. It is still possible to
// window on processing time.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
index 321a3c1a1ca..a36b57ff42f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java
@@ -85,20 +85,12 @@ public class ReadChangeStreamPartitionAction {
* <li>Process CloseStream if it exists. In order to solve a possible inconsistent state
* problem, we do not process CloseStream after receiving it. We claim the CloseStream in
* the RestrictionTracker so it persists after a checkpoint. We checkpoint to flush all the
- * DataChanges. Then on resume, we process the CloseStream. There are only 2 expected Status
- * for CloseStream: OK and Out of Range.
- * <ol>
- * <li>OK status is returned when the predetermined endTime has been reached. In this
- * case, we update the watermark and update the metadata table. {@link
- * DetectNewPartitionsDoFn} aggregates the watermark from all the streams to ensure
- * all the streams have reached beyond endTime so it can also terminate and end the
- * beam job.
- * <li>Out of Range is returned when the partition has either been split into more
- * partitions or merged into a larger partition. In this case, we write to the
- * metadata table the new partitions' information so that {@link
- * DetectNewPartitionsDoFn} can read and output those new partitions to be streamed.
- * We also need to ensure we clean up this partition's metadata to release the lock.
- * </ol>
+ * DataChanges. Then on resume, we process the CloseStream. There is only 1 expected Status
+ * for CloseStream: Out of Range. Out of Range is returned when the partition has either
+ * been split into more partitions or merged into a larger partition. In this case, we write
+ * to the metadata table the new partitions' information so that {@link
+ * DetectNewPartitionsDoFn} can read and output those new partitions to be streamed. We also
+ * need to ensure we clean up this partition's metadata to release the lock.
* <li>Update the metadata table with the watermark and additional debugging info.
* <li>Stream the partition.
* </ol>
@@ -145,11 +137,7 @@ public class ReadChangeStreamPartitionAction {
try {
stream =
changeStreamDao.readChangeStreamPartition(
- partitionRecord,
- tracker.currentRestriction(),
- partitionRecord.getEndTime(),
- heartbeatDurationSeconds,
- shouldDebug);
+ partitionRecord, tracker.currentRestriction(), heartbeatDurationSeconds, shouldDebug);
for (ChangeStreamRecord record : stream) {
Optional<ProcessContinuation> result =
changeStreamAction.run(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
index 98902866cb1..3cbd0fb7902 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java
@@ -31,7 +31,6 @@ import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +61,7 @@ public class ChangeStreamDao {
* Streams a partition.
*
* @param partition the partition to stream
- * @param streamProgress may contain a continuation token for the stream request
- * @param endTime time to end the stream, may be null
+ * @param streamProgress may contain a continuation token for the stream request\
* @param heartbeatDurationSeconds period between heartbeat messages
* @return stream of ReadChangeStreamResponse
* @throws IOException if the stream could not be started
@@ -71,7 +69,6 @@ public class ChangeStreamDao {
public ServerStream<ChangeStreamRecord> readChangeStreamPartition(
PartitionRecord partition,
StreamProgress streamProgress,
- @Nullable Timestamp endTime,
Duration heartbeatDurationSeconds,
boolean shouldDebug)
throws IOException {
@@ -92,9 +89,6 @@ public class ChangeStreamDao {
} else {
throw new IOException("Something went wrong");
}
- if (endTime != null) {
- query.endTime(endTime.toProto());
- }
query.heartbeatDuration(heartbeatDurationSeconds.getStandardSeconds());
if (shouldDebug) {
LOG.info(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
index 6e8f21cd00b..12f95ac160d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;
import java.io.IOException;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
@@ -42,7 +41,6 @@ import org.joda.time.Instant;
@UnboundedPerElement
public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, PartitionRecord> {
private static final long serialVersionUID = 8052524268978107367L;
- @Nullable private final com.google.cloud.Timestamp endTime;
private final DaoFactory daoFactory;
private final ChangeStreamMetrics metrics;
@@ -50,13 +48,9 @@ public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, Pa
private DetectNewPartitionsAction detectNewPartitionsAction;
public DetectNewPartitionsDoFn(
- @Nullable com.google.cloud.Timestamp endTime,
- ActionFactory actionFactory,
- DaoFactory daoFactory,
- ChangeStreamMetrics metrics) {
+ ActionFactory actionFactory, DaoFactory daoFactory, ChangeStreamMetrics metrics) {
this.actionFactory = actionFactory;
this.daoFactory = daoFactory;
- this.endTime = endTime;
this.metrics = metrics;
}
@@ -92,10 +86,10 @@ public class DetectNewPartitionsDoFn extends DoFn<com.google.cloud.Timestamp, Pa
final MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao();
final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
GenerateInitialPartitionsAction generateInitialPartitionsAction =
- actionFactory.generateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+ actionFactory.generateInitialPartitionsAction(metrics, changeStreamDao);
detectNewPartitionsAction =
actionFactory.detectNewPartitionsAction(
- metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+ metrics, metadataTableDao, generateInitialPartitionsAction);
}
@ProcessElement
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
index 15ebf4c8fd6..c94011c8f7a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java
@@ -40,43 +40,26 @@ public class PartitionRecord implements Serializable {
private ByteStringRange partition;
@Nullable private Timestamp startTime;
@Nullable private List<ChangeStreamContinuationToken> changeStreamContinuationTokens;
- @Nullable private Timestamp endTime;
private String uuid;
private Timestamp parentLowWatermark;
public PartitionRecord(
- ByteStringRange partition,
- Timestamp startTime,
- String uuid,
- Timestamp parentLowWatermark,
- @Nullable Timestamp endTime) {
+ ByteStringRange partition, Timestamp startTime, String uuid, Timestamp parentLowWatermark) {
this.partition = partition;
this.startTime = startTime;
this.uuid = uuid;
this.parentLowWatermark = parentLowWatermark;
- this.endTime = endTime;
}
public PartitionRecord(
ByteStringRange partition,
List<ChangeStreamContinuationToken> changeStreamContinuationTokens,
String uuid,
- Timestamp parentLowWatermark,
- @Nullable Timestamp endTime) {
+ Timestamp parentLowWatermark) {
this.partition = partition;
this.changeStreamContinuationTokens = changeStreamContinuationTokens;
this.uuid = uuid;
this.parentLowWatermark = parentLowWatermark;
- this.endTime = endTime;
- }
-
- @Nullable
- public Timestamp getEndTime() {
- return endTime;
- }
-
- public void setEndTime(@Nullable Timestamp endTime) {
- this.endTime = endTime;
}
@Nullable
@@ -135,7 +118,6 @@ public class PartitionRecord implements Serializable {
&& Objects.equals(getStartTime(), that.getStartTime())
&& Objects.equals(
getChangeStreamContinuationTokens(), that.getChangeStreamContinuationTokens())
- && Objects.equals(getEndTime(), that.getEndTime())
&& getUuid().equals(that.getUuid())
&& Objects.equals(getParentLowWatermark(), that.getParentLowWatermark());
}
@@ -146,7 +128,6 @@ public class PartitionRecord implements Serializable {
getPartition(),
getStartTime(),
getChangeStreamContinuationTokens(),
- getEndTime(),
getUuid(),
getParentLowWatermark());
}
@@ -160,8 +141,6 @@ public class PartitionRecord implements Serializable {
+ startTime
+ ", changeStreamContinuationTokens="
+ changeStreamContinuationTokens
- + ", endTime="
- + endTime
+ ", uuid='"
+ uuid
+ '\''
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
index b3f86c9be10..c8d88f2af75 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsActionTest.java
@@ -68,7 +68,6 @@ public class DetectNewPartitionsActionTest {
private MetadataTableDao metadataTableDao;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private Timestamp startTime;
- private Timestamp endTime;
private static BigtableDataClient dataClient;
private static BigtableTableAdminClient adminClient;
@@ -104,10 +103,8 @@ public class DetectNewPartitionsActionTest {
metadataTableAdminDao.getChangeStreamNamePrefix());
startTime = Timestamp.now();
- endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, startTime.getNanos());
action =
- new DetectNewPartitionsAction(
- metrics, metadataTableDao, endTime, generateInitialPartitionsAction);
+ new DetectNewPartitionsAction(metrics, metadataTableDao, generateInitialPartitionsAction);
watermarkEstimator = new WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
index c407ec70e63..eddc68a3c3f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsActionTest.java
@@ -66,7 +66,6 @@ public class GenerateInitialPartitionsActionTest {
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private Timestamp startTime;
- private Timestamp endTime;
private static BigtableTableAdminClient adminClient;
@Captor ArgumentCaptor<PartitionRecord> partitionRecordArgumentCaptor;
@@ -89,7 +88,6 @@ public class GenerateInitialPartitionsActionTest {
adminClient, null, changeStreamId, MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME);
metadataTableAdminDao.createMetadataTable();
startTime = Timestamp.now();
- endTime = Timestamp.ofTimeSecondsAndNanos(startTime.getSeconds() + 10, startTime.getNanos());
watermarkEstimator = new WatermarkEstimators.Manual(TimestampConverter.toInstant(startTime));
}
@@ -103,7 +101,7 @@ public class GenerateInitialPartitionsActionTest {
when(changeStreamDao.generateInitialChangeStreamPartitions()).thenReturn(partitionRecordList);
GenerateInitialPartitionsAction generateInitialPartitionsAction =
- new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime);
+ new GenerateInitialPartitionsAction(metrics, changeStreamDao);
assertEquals(
ProcessContinuation.resume(),
generateInitialPartitionsAction.run(receiver, tracker, watermarkEstimator, startTime));