You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "pabloem (via GitHub)" <gi...@apache.org> on 2023/02/14 02:36:29 UTC

[GitHub] [beam] pabloem commented on a diff in pull request #25411: Generate initial list of partitions to stream

pabloem commented on code in PR #25411:
URL: https://github.com/apache/beam/pull/25411#discussion_r1105229153


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java:
##########
@@ -47,12 +57,34 @@ public GenerateInitialPartitionsAction(
    * The very first step of the pipeline when there are no partitions being streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);

Review Comment:
   have you tested this? Your comment addresses my main question, but I am curious why you'd need this if you have appropriate watermark information coming from the record itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org