You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/08 13:06:26 UTC

[GitHub] [beam] scwhittle commented on a diff in pull request #23997: Add GetSize implementation for DetectNewPartitions SDF

scwhittle commented on code in PR #23997:
URL: https://github.com/apache/beam/pull/23997#discussion_r1016607758


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java:
##########
@@ -109,9 +118,15 @@ public TimestampRange initialRestriction(@Element PartitionMetadata partition) {
         TimestampUtils.previous(createdAt), com.google.cloud.Timestamp.MAX_VALUE);
   }
 
+  @GetSize
+  public double getSize() {
+    double throughput = throughputEstimator.get();
+    LOG.debug("Reported getSize() - throughput: " + throughput);

Review Comment:
   this log is confusing to me, it looks like subtraction



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java:
##########
@@ -109,9 +118,15 @@ public TimestampRange initialRestriction(@Element PartitionMetadata partition) {
         TimestampUtils.previous(createdAt), com.google.cloud.Timestamp.MAX_VALUE);
   }
 
+  @GetSize
+  public double getSize() {
+    double throughput = throughputEstimator.get();
+    LOG.debug("Reported getSize() - throughput: " + throughput);
+    return throughput;

Review Comment:
   Reporting zero seems good if you never want to upscale/downscale the pipeline based upon this stage.  Dataflow autoscaling is currently based upon max of needed stages considering each stage independently.
   Would this DoFn ever require to run on more than 1 worker? Would it ever be resource constrained?
   
   Another idea would be to have some threshold for which you report zero instead of the calculated backlog. This would help not trigger spurious upscaling due to the backlog calculation when things are keeping up but could give a signal in the cases things were behind.
   
   I also agree reporting recent throughput as backlog is confusing, it seems like this will always be non-zero?  Is there a way to tell if timeGap is increasing because there is nothing to do or it is actually falling behind? That seems like a prerequisite for reporting backlog correctly.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java:
##########
@@ -191,6 +196,7 @@ private void outputBatch(
 
       receiver.outputWithTimestamp(partition, new Instant(minWatermark.toSqlTimestamp()));
 
+      throughputEstimator.update(Timestamp.now(), Utf8.encodedLength(partition.toString()));

Review Comment:
   comment on why encoding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org