You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/03 21:56:59 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r464673014



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,6 +515,9 @@
               && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       nit: here and below around line 558
   ```suggestion
                         (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        Progress progress = ((HasProgress) currentTracker).getProgress();
+        double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining();
+        double completed =
+            totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted();
+        double remaining =
+            totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex())
+                + progress.getWorkRemaining();
+        return Progress.from(completed, remaining);
+      }
+    }
+    return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindowIterator != null) {

Review comment:
       Should we register this with `addProgressRequestCallback` so we generate monitoring infos?
   
   I'm not sure if truncate should be using the downstream progress as part of its calculation when reporting it as a monitoring info. I know that this differs from how we calculate the progress/split point for the SplittableFnDataReceiver since the singular fraction needs to take into account the downstream progress accurately.
   
   I was always envisioning that work completed/work remaining for the monitoring infos always represented the local knowledge of work and didn't take into account any downstream/upstream knowledge. We can avoid this issue if we merge this logic into the `getProgress` method around line 533.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org