You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "pabloem (via GitHub)" <gi...@apache.org> on 2023/02/16 18:36:36 UTC

[GitHub] [beam] pabloem commented on a diff in pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

pabloem commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1108879915


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -133,6 +135,46 @@ public Optional<DoFn.ProcessContinuation> run(
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
     } else {
       LOG.warn(
           "RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));

Review Comment:
   perhaps log this as an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org