You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/16 18:47:46 UTC
[beam] branch bigtable-cdc-feature-branch updated: Handle ChangeStreamMutation (#25459)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
new d3c46939f21 Handle ChangeStreamMutation (#25459)
d3c46939f21 is described below
commit d3c46939f219956ea5fc458b6939a853e296228d
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Thu Feb 16 13:47:37 2023 -0500
Handle ChangeStreamMutation (#25459)
---
.../changestreams/ChangeStreamMetrics.java | 55 ++++++++++++++++++++
.../changestreams/action/ChangeStreamAction.java | 42 +++++++++++++++
.../action/ChangeStreamActionTest.java | 60 ++++++++++++++++++++++
3 files changed, 157 insertions(+)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2c534020d39..ed14eb50d1c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
import java.io.Serializable;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
/** Class to aggregate metrics related functionality. */
@@ -46,6 +47,30 @@ public class ChangeStreamMetrics implements Serializable {
org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
"heartbeat_count");
+ /**
+ * Counter for the total number of ChangeStreamMutations that are initiated by users (not garbage
+ * collection) identified during the execution of the Connector.
+ */
+ public static final Counter CHANGE_STREAM_MUTATION_USER_COUNT =
+ Metrics.counter(
+ org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+ "change_stream_mutation_user_count");
+
+ /**
+ * Counter for the total number of ChangeStreamMutations that are initiated by garbage collection
+ * (not user initiated) identified during the execution of the Connector.
+ */
+ public static final Counter CHANGE_STREAM_MUTATION_GC_COUNT =
+ Metrics.counter(
+ org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+ "change_stream_mutation_gc_count");
+
+ /** Distribution for measuring processing delay from commit timestamp. */
+ public static final Distribution PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP =
+ Metrics.distribution(
+ org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+ "processing_delay_from_commit_timestamp");
+
/**
* Increments the {@link
* org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT} by
@@ -64,7 +89,37 @@ public class ChangeStreamMetrics implements Serializable {
inc(HEARTBEAT_COUNT);
}
+ /**
+ * Increments the {@link
+ * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_USER_COUNT}
+ * by 1 if the metric is enabled.
+ */
+ public void incChangeStreamMutationUserCounter() {
+ inc(CHANGE_STREAM_MUTATION_USER_COUNT);
+ }
+
+ /**
+ * Increments the {@link
+ * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_GC_COUNT}
+ * by 1 if the metric is enabled.
+ */
+ public void incChangeStreamMutationGcCounter() {
+ inc(CHANGE_STREAM_MUTATION_GC_COUNT);
+ }
+
+ /**
+ * Adds measurement of an instance for the {@link
+ * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP}.
+ */
+ public void updateProcessingDelayFromCommitTimestamp(long durationInMilli) {
+ update(PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP, durationInMilli);
+ }
+
private void inc(Counter counter) {
counter.inc();
}
+
+ private void update(Distribution distribution, long value) {
+ distribution.update(value);
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 8a2cbd6ed84..e64cd6fb876 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.Heartbeat;
+import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
@@ -133,6 +135,46 @@ public class ChangeStreamAction {
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incHeartbeatCount();
+ } else if (record instanceof ChangeStreamMutation) {
+ ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+ final Instant watermark =
+ TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+ watermarkEstimator.setWatermark(watermark);
+ // Build a new StreamProgress with the continuation token to be claimed.
+ ChangeStreamContinuationToken changeStreamContinuationToken =
+ new ChangeStreamContinuationToken(
+ Range.ByteStringRange.create(
+ partitionRecord.getPartition().getStart(),
+ partitionRecord.getPartition().getEnd()),
+ changeStreamMutation.getToken());
+ StreamProgress streamProgress =
+ new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+ // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+ // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+ // runner initiated checkpoints.
+ if (!tracker.tryClaim(streamProgress)) {
+ if (shouldDebug) {
+ LOG.info(
+ "RCSP {}: Failed to claim data change tracker",
+ formatByteStringRange(partitionRecord.getPartition()));
+ }
+ return Optional.of(DoFn.ProcessContinuation.stop());
+ }
+ if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+ metrics.incChangeStreamMutationGcCounter();
+ } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+ metrics.incChangeStreamMutationUserCounter();
+ }
+ Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+ metrics.updateProcessingDelayFromCommitTimestamp(
+ Instant.now().getMillis() - delay.getMillis());
+
+ KV<ByteString, ChangeStreamMutation> outputRecord =
+ KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+ // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+ // limits the ability to window on commit time of any data changes. It is still possible to
+ // window on processing time.
+ receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
} else {
LOG.warn(
"RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
index a6f9f934b7c..b2c85481e2e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -86,4 +87,63 @@ public class ChangeStreamActionTest {
StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
verify(tracker).tryClaim(eq(streamProgress));
}
+
+ @Test
+ public void testChangeStreamMutationUser() {
+ ByteStringRange partition = ByteStringRange.create("", "");
+ when(partitionRecord.getPartition()).thenReturn(partition);
+ final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
+ ChangeStreamContinuationToken changeStreamContinuationToken =
+ new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
+ ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
+ Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
+ Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
+ Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
+ Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER);
+ KV<ByteString, ChangeStreamMutation> record =
+ KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+
+ final Optional<DoFn.ProcessContinuation> result =
+ action.run(
+ partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
+
+ assertFalse(result.isPresent());
+ verify(metrics).incChangeStreamMutationUserCounter();
+ verify(metrics, never()).incChangeStreamMutationGcCounter();
+ StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
+ verify(tracker).tryClaim(eq(streamProgress));
+ verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
+ verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
+ }
+
+ @Test
+ public void testChangeStreamMutationGc() {
+ ByteStringRange partition = ByteStringRange.create("", "");
+ when(partitionRecord.getPartition()).thenReturn(partition);
+ final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+ final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
+ ChangeStreamContinuationToken changeStreamContinuationToken =
+ new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
+ ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
+ Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
+ Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
+ Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
+ Mockito.when(changeStreamMutation.getType())
+ .thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
+ KV<ByteString, ChangeStreamMutation> record =
+ KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+
+ final Optional<DoFn.ProcessContinuation> result =
+ action.run(
+ partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
+
+ assertFalse(result.isPresent());
+ verify(metrics).incChangeStreamMutationGcCounter();
+ verify(metrics, never()).incChangeStreamMutationUserCounter();
+ StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
+ verify(tracker).tryClaim(eq(streamProgress));
+ verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
+ verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
+ }
}