You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "tonytanger (via GitHub)" <gi...@apache.org> on 2023/02/13 22:23:44 UTC

[GitHub] [beam] tonytanger opened a new pull request, #25459: Cloud Bigtable stream changes and handle Mutation responses

tonytanger opened a new pull request, #25459:
URL: https://github.com/apache/beam/pull/25459

   Cloud Bigtable stream changes and handle Mutation responses
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1108879915


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -133,6 +135,46 @@ public Optional<DoFn.ProcessContinuation> run(
         return Optional.of(DoFn.ProcessContinuation.stop());
       }
       metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
     } else {
       LOG.warn(
           "RCSP {}: Invalid response type", formatByteStringRange(partitionRecord.getPartition()));

Review Comment:
   perhaps log this as an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tonytanger commented on pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "tonytanger (via GitHub)" <gi...@apache.org>.
tonytanger commented on PR #25459:
URL: https://github.com/apache/beam/pull/25459#issuecomment-1430023817

   R: @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tonytanger commented on a diff in pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "tonytanger (via GitHub)" <gi...@apache.org>.
tonytanger commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1106058882


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -100,6 +106,79 @@ public Optional<DoFn.ProcessContinuation> run(
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
       boolean shouldDebug) {
+    if (record instanceof Heartbeat) {
+      Heartbeat heartbeat = (Heartbeat) record;
+      StreamProgress streamProgress =
+          new StreamProgress(
+              heartbeat.getChangeStreamContinuationToken(), heartbeat.getLowWatermark());
+      final Instant watermark = TimestampConverter.toInstant(heartbeat.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+
+      if (shouldDebug) {
+        LOG.info(
+            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
+            formatByteStringRange(partitionRecord.getPartition()),
+            formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
+            heartbeat.getChangeStreamContinuationToken().getToken(),
+            heartbeat.getLowWatermark());
+      }
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See {@link
+      // org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
+      // for more information regarding runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim heart beat tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);

Review Comment:
   @pabloem 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #25459:
URL: https://github.com/apache/beam/pull/25459#issuecomment-1433560416

   I'll merge for now but I'd like to have https://github.com/apache/beam/pull/25459#discussion_r1106058882 addressed at some point


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1108893903


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -100,6 +106,79 @@ public Optional<DoFn.ProcessContinuation> run(
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
       boolean shouldDebug) {
+    if (record instanceof Heartbeat) {
+      Heartbeat heartbeat = (Heartbeat) record;
+      StreamProgress streamProgress =
+          new StreamProgress(
+              heartbeat.getChangeStreamContinuationToken(), heartbeat.getLowWatermark());
+      final Instant watermark = TimestampConverter.toInstant(heartbeat.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+
+      if (shouldDebug) {
+        LOG.info(
+            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
+            formatByteStringRange(partitionRecord.getPartition()),
+            formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
+            heartbeat.getChangeStreamContinuationToken().getToken(),
+            heartbeat.getLowWatermark());
+      }
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See {@link
+      // org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
+      // for more information regarding runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim heart beat tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);

Review Comment:
   wouldnt the `getLowWatermark` parameter be a better measure of the timestamp for the record? Currently the watermark is advancing, so users may still add windowing.
   
   I think rather than set the timestamp to EPOCH, I would say it's best to keep the watermark stuck at EPOCH, but the records having proper timestamps



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem merged PR #25459:
URL: https://github.com/apache/beam/pull/25459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tonytanger commented on a diff in pull request #25459: Cloud Bigtable stream changes and handle Mutation responses

Posted by "tonytanger (via GitHub)" <gi...@apache.org>.
tonytanger commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1110203693


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -100,6 +106,79 @@ public Optional<DoFn.ProcessContinuation> run(
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
       boolean shouldDebug) {
+    if (record instanceof Heartbeat) {
+      Heartbeat heartbeat = (Heartbeat) record;
+      StreamProgress streamProgress =
+          new StreamProgress(
+              heartbeat.getChangeStreamContinuationToken(), heartbeat.getLowWatermark());
+      final Instant watermark = TimestampConverter.toInstant(heartbeat.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+
+      if (shouldDebug) {
+        LOG.info(
+            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
+            formatByteStringRange(partitionRecord.getPartition()),
+            formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
+            heartbeat.getChangeStreamContinuationToken().getToken(),
+            heartbeat.getLowWatermark());
+      }
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See {@link
+      // org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
+      // for more information regarding runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim heart beat tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);

Review Comment:
   We considered not advancing the watermark. I think not advancing the watermark gives the confusing signal that the pipeline is not progressing.
   
   `getCommitTimestamp` is the best measure of the timestamp of the record.
   
   In the future, we are considering adding an option so the user can choose to output the records with the `commitTimestamp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org