You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/16 18:47:46 UTC

[beam] branch bigtable-cdc-feature-branch updated: Handle ChangeStreamMutation (#25459)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch bigtable-cdc-feature-branch
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/bigtable-cdc-feature-branch by this push:
     new d3c46939f21 Handle ChangeStreamMutation (#25459)
d3c46939f21 is described below

commit d3c46939f219956ea5fc458b6939a853e296228d
Author: Tony Tang <nf...@gmail.com>
AuthorDate: Thu Feb 16 13:47:37 2023 -0500

    Handle ChangeStreamMutation (#25459)
---
 .../changestreams/ChangeStreamMetrics.java         | 55 ++++++++++++++++++++
 .../changestreams/action/ChangeStreamAction.java   | 42 +++++++++++++++
 .../action/ChangeStreamActionTest.java             | 60 ++++++++++++++++++++++
 3 files changed, 157 insertions(+)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
index 2c534020d39..ed14eb50d1c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
 
 import java.io.Serializable;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 
 /** Class to aggregate metrics related functionality. */
@@ -46,6 +47,30 @@ public class ChangeStreamMetrics implements Serializable {
           org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
           "heartbeat_count");
 
+  /**
+   * Counter for the total number of ChangeStreamMutations that are initiated by users (not garbage
+   * collection) identified during the execution of the Connector.
+   */
+  public static final Counter CHANGE_STREAM_MUTATION_USER_COUNT =
+      Metrics.counter(
+          org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+          "change_stream_mutation_user_count");
+
+  /**
+   * Counter for the total number of ChangeStreamMutations that are initiated by garbage collection
+   * (not user initiated) identified during the execution of the Connector.
+   */
+  public static final Counter CHANGE_STREAM_MUTATION_GC_COUNT =
+      Metrics.counter(
+          org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+          "change_stream_mutation_gc_count");
+
+  /** Distribution for measuring processing delay from commit timestamp. */
+  public static final Distribution PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP =
+      Metrics.distribution(
+          org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics.class,
+          "processing_delay_from_commit_timestamp");
+
   /**
    * Increments the {@link
    * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#LIST_PARTITIONS_COUNT} by
@@ -64,7 +89,37 @@ public class ChangeStreamMetrics implements Serializable {
     inc(HEARTBEAT_COUNT);
   }
 
+  /**
+   * Increments the {@link
+   * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_USER_COUNT}
+   * by 1 if the metric is enabled.
+   */
+  public void incChangeStreamMutationUserCounter() {
+    inc(CHANGE_STREAM_MUTATION_USER_COUNT);
+  }
+
+  /**
+   * Increments the {@link
+   * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#CHANGE_STREAM_MUTATION_GC_COUNT}
+   * by 1 if the metric is enabled.
+   */
+  public void incChangeStreamMutationGcCounter() {
+    inc(CHANGE_STREAM_MUTATION_GC_COUNT);
+  }
+
+  /**
+   * Adds measurement of an instance for the {@link
+   * org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics#PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP}.
+   */
+  public void updateProcessingDelayFromCommitTimestamp(long durationInMilli) {
+    update(PROCESSING_DELAY_FROM_COMMIT_TIMESTAMP, durationInMilli);
+  }
+
   private void inc(Counter counter) {
     counter.inc();
   }
+
+  private void update(Distribution distribution, long value) {
+    distribution.update(value);
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
index 8a2cbd6ed84..e64cd6fb876 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;
 
 import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange;
 
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
 import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
 import com.google.cloud.bigtable.data.v2.models.Heartbeat;
+import com.google.cloud.bigtable.data.v2.models.Range;
 import com.google.protobuf.ByteString;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
@@ -133,6 +135,46 @@ public class ChangeStreamAction {
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
     } else {
       LOG.warn(
           "RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
index a6f9f934b7c..b2c85481e2e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -86,4 +87,63 @@ public class ChangeStreamActionTest {
     StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
     verify(tracker).tryClaim(eq(streamProgress));
   }
+
+  @Test
+  public void testChangeStreamMutationUser() {
+    ByteStringRange partition = ByteStringRange.create("", "");
+    when(partitionRecord.getPartition()).thenReturn(partition);
+    final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+    final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
+    ChangeStreamContinuationToken changeStreamContinuationToken =
+        new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
+    ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
+    Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
+    Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
+    Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
+    Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER);
+    KV<ByteString, ChangeStreamMutation> record =
+        KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+
+    final Optional<DoFn.ProcessContinuation> result =
+        action.run(
+            partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
+
+    assertFalse(result.isPresent());
+    verify(metrics).incChangeStreamMutationUserCounter();
+    verify(metrics, never()).incChangeStreamMutationGcCounter();
+    StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
+    verify(tracker).tryClaim(eq(streamProgress));
+    verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
+    verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
+  }
+
+  @Test
+  public void testChangeStreamMutationGc() {
+    ByteStringRange partition = ByteStringRange.create("", "");
+    when(partitionRecord.getPartition()).thenReturn(partition);
+    final Timestamp commitTimestamp = Timestamp.newBuilder().setSeconds(1000).build();
+    final Timestamp lowWatermark = Timestamp.newBuilder().setSeconds(500).build();
+    ChangeStreamContinuationToken changeStreamContinuationToken =
+        new ChangeStreamContinuationToken(ByteStringRange.create("", ""), "1234");
+    ChangeStreamMutation changeStreamMutation = Mockito.mock(ChangeStreamMutation.class);
+    Mockito.when(changeStreamMutation.getCommitTimestamp()).thenReturn(commitTimestamp);
+    Mockito.when(changeStreamMutation.getToken()).thenReturn("1234");
+    Mockito.when(changeStreamMutation.getLowWatermark()).thenReturn(lowWatermark);
+    Mockito.when(changeStreamMutation.getType())
+        .thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
+    KV<ByteString, ChangeStreamMutation> record =
+        KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+
+    final Optional<DoFn.ProcessContinuation> result =
+        action.run(
+            partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false);
+
+    assertFalse(result.isPresent());
+    verify(metrics).incChangeStreamMutationGcCounter();
+    verify(metrics, never()).incChangeStreamMutationUserCounter();
+    StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark);
+    verify(tracker).tryClaim(eq(streamProgress));
+    verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH));
+    verify(watermarkEstimator).setWatermark(eq(TimestampConverter.toInstant(lowWatermark)));
+  }
 }