You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/20 22:37:46 UTC

[GitHub] [beam] lukecwik opened a new pull request #11472: [BEAM-2939] Expose HasProgress interface for restriction trackers and use the progress value during splitting

lukecwik opened a new pull request #11472:
URL: https://github.com/apache/beam/pull/11472


   Note that this change excludes a necessary "rename" of the Sizes class to something more appropriate to make the review simpler.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618169719


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413517363



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Sizes.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/** Definitions and convenience methods for reporting sizes for SplittableDoFns. */
-@Experimental(Kind.SPLITTABLE_DO_FN)
-public final class Sizes {
-  /**
-   * {@link RestrictionTracker}s which can provide a size should implement this interface.
-   * Implementations that do not implement this interface will be assumed to have an equivalent
-   * size.
-   */
-  public interface HasSize {

Review comment:
       The intent was to remove using size as a progress signal. Which means that all restriction trackers need to implement the slightly more complicated HasProgress. Since that is the case, there is no need to keep HasSize around as a fallback since we can use the work remaining as the default size when it is not overridden by a `@GetSize` override.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413517363



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Sizes.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/** Definitions and convenience methods for reporting sizes for SplittableDoFns. */
-@Experimental(Kind.SPLITTABLE_DO_FN)
-public final class Sizes {
-  /**
-   * {@link RestrictionTracker}s which can provide a size should implement this interface.
-   * Implementations that do not implement this interface will be assumed to have an equivalent
-   * size.
-   */
-  public interface HasSize {

Review comment:
       The intent was to remove using size as a progress signal. Which means that all restriction trackers need to implement the slightly more complicated HasProgress. Since that is the case, there is no need to keep HasSize around as a fallback since we can use the work remaining as the default size if there is no `@GetSize` override.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413515258



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -397,12 +396,14 @@ public WatermarkEstimatorStateT getState() {
   }
 
   public static class DefaultGetSize {
-    /** Uses {@link Sizes.HasSize} to produce the size. */
+    /** Uses {@link HasProgress} to produce the size. */
     @SuppressWarnings("unused")
     public static <InputT, OutputT> double invokeGetSize(
         DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
-      if (argumentProvider.restrictionTracker() instanceof HasSize) {
-        return ((HasSize) argumentProvider.restrictionTracker()).getSize();
+      if (argumentProvider.restrictionTracker() instanceof HasProgress) {

Review comment:
       HasSize was removed.
   
   DoFn can provide a method annotated with `@GetSize` and the fallback is the HasProgress on the RestrictionTracker. This is analogous to RestrictionProvider/RProvider being able to provide the size while the RestrictionTracker/RTracker provide progress in Python/Go.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Expose HasProgress interface for restriction trackers and use the progress value during splitting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-616906179


   @boyuanzz, after your comment on #10897, I realized I needed to fix Read.java otherwise the splitting calculation would be incorrect since it would be using the "initial size" which can't be used for Java UnboundedSource objects.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618080046


   > Yes, I believe that `restriction size` is really syntactic sugar for `work remaining`. So in fixed size restrictions, `restriction size` should be able to be used to calculate the work remaining when we start and when we finish which then allows us to compute `work completed`.
   
   Thinking about this more, it doesn't seem like we can use restriction size since we don't have access to the reduced current restriction size as the tracker is making progress. This prevents us from passing it into the get_size methods in Java/Python/Go.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413493450



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
##########
@@ -94,9 +94,13 @@ void invokeOnTimer(
   void invokeSplitRestriction(ArgumentProvider<InputT, OutputT> arguments);
 
   /**
-   * Invoke the {@link DoFn.GetSize} method on the bound {@link DoFn}. Falls back to get the size
-   * from the {@link RestrictionTracker} if it supports {@link Sizes.HasSize}, otherwise returns
-   * 1.0.
+   * Invoke the {@link DoFn.GetSize} method on the bound {@link DoFn}. Falls back to:
+   *
+   * <ol>
+   *   <li>get the work remaining from the {@link RestrictionTracker} if it supports {@link
+   *       HasProgress}.
+   *   <li>returning the constant {@link 1.0}.

Review comment:
       Do we want to highlight that this fallback may impact batch autoscaling if `HasProcess` is not implemented correctly.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/Sizes.java
##########
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.splittabledofn;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-
-/** Definitions and convenience methods for reporting sizes for SplittableDoFns. */
-@Experimental(Kind.SPLITTABLE_DO_FN)
-public final class Sizes {
-  /**
-   * {@link RestrictionTracker}s which can provide a size should implement this interface.
-   * Implementations that do not implement this interface will be assumed to have an equivalent
-   * size.
-   */
-  public interface HasSize {

Review comment:
       I though we still want to keep this and make `HasProgress` as a fallback?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -397,12 +396,14 @@ public WatermarkEstimatorStateT getState() {
   }
 
   public static class DefaultGetSize {
-    /** Uses {@link Sizes.HasSize} to produce the size. */
+    /** Uses {@link HasProgress} to produce the size. */
     @SuppressWarnings("unused")
     public static <InputT, OutputT> double invokeGetSize(
         DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
-      if (argumentProvider.restrictionTracker() instanceof HasSize) {
-        return ((HasSize) argumentProvider.restrictionTracker()).getSize();
+      if (argumentProvider.restrictionTracker() instanceof HasProgress) {

Review comment:
       I'm not very familiar with how this invoker works but I thought we check whether `restrictionTracker ` has `HasSize`, if not then fallback to progress.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618414366


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618135733


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on issue #11472: [BEAM-2939] Expose HasProgress interface for restriction trackers and use the progress value during splitting

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-616915514


   > @boyuanzz, after your comment on #10897, I realized I needed to fix Read.java otherwise the splitting calculation would be incorrect since it would be using the "initial size" which can't be used for Java UnboundedSource objects.
   
   Thanks, Luke! It makes me feel like `getSize()`, or `restriction_size` in python actually means different between batch and streaming. Do we want to make it explicit in documents? And are we also going to migrate python progress in the same way?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618414366


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Expose HasProgress interface for restriction trackers and use the progress value during splitting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-617261109


   Yes, I believe that `restriction size` is really syntactic sugar for `work remaining`. So in fixed size restrictions, `restriction size` should be able to be used to calculate the work remaining when we start and when we finish which then allows us to compute `work completed`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618413989


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413515258



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -397,12 +396,14 @@ public WatermarkEstimatorStateT getState() {
   }
 
   public static class DefaultGetSize {
-    /** Uses {@link Sizes.HasSize} to produce the size. */
+    /** Uses {@link HasProgress} to produce the size. */
     @SuppressWarnings("unused")
     public static <InputT, OutputT> double invokeGetSize(
         DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
-      if (argumentProvider.restrictionTracker() instanceof HasSize) {
-        return ((HasSize) argumentProvider.restrictionTracker()).getSize();
+      if (argumentProvider.restrictionTracker() instanceof HasProgress) {

Review comment:
       DoFn can provide a method annotated with `@GetSize` and the fallback is the HasProgress on the RestrictionTracker. This is analogous to RestrictionProvider/RProvider being able to provide the size while the RestrictionTracker/RTracker provide progress in Python/Go.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413517743



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
##########
@@ -94,9 +94,13 @@ void invokeOnTimer(
   void invokeSplitRestriction(ArgumentProvider<InputT, OutputT> arguments);
 
   /**
-   * Invoke the {@link DoFn.GetSize} method on the bound {@link DoFn}. Falls back to get the size
-   * from the {@link RestrictionTracker} if it supports {@link Sizes.HasSize}, otherwise returns
-   * 1.0.
+   * Invoke the {@link DoFn.GetSize} method on the bound {@link DoFn}. Falls back to:
+   *
+   * <ol>
+   *   <li>get the work remaining from the {@link RestrictionTracker} if it supports {@link
+   *       HasProgress}.
+   *   <li>returning the constant {@link 1.0}.

Review comment:
       This is an internal comment for the implementation here.
   
   The user having comments are in DoFn.java and RestrictionTracker.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939, BEAM-5602] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r414031067



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -311,9 +315,14 @@ private void createRunnerAndConsumersForPTransformRecursively(
       // Get finish bundle Execution Time Metrics.
       response.addAllMonitoringInfos(
           bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Extract all other MonitoringInfos other than the execution time monitoring infos.
+      // Extract MonitoringInfos that come from the metrics container registry.
       response.addAllMonitoringInfos(
           bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
+      // Add any additional monitoring infos that the "runners" report explicitly.
+      for (ProgressRequestCallback progressRequestCallback :
+          bundleProcessor.getProgressRequestCallbacks()) {
+        response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());

Review comment:
       Progress metrics won't appear in the final monitoring infos since there are no active elements. Any residuals will contain their size if a sized SDF element and processing URN was used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413973266



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -397,12 +396,14 @@ public WatermarkEstimatorStateT getState() {
   }
 
   public static class DefaultGetSize {
-    /** Uses {@link Sizes.HasSize} to produce the size. */
+    /** Uses {@link HasProgress} to produce the size. */
     @SuppressWarnings("unused")
     public static <InputT, OutputT> double invokeGetSize(
         DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
-      if (argumentProvider.restrictionTracker() instanceof HasSize) {
-        return ((HasSize) argumentProvider.restrictionTracker()).getSize();
+      if (argumentProvider.restrictionTracker() instanceof HasProgress) {

Review comment:
       Yes, because it allows the user to provide an override on how size is computed without needing to implement a restriction tracker. This is similar to how a restriction provider can compute the size.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Expose HasProgress interface for restriction trackers and use the progress value during splitting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-616848148


   R: @boyuanzz @ihji 
   CC: @youngoli @chamikaramj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618517667


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on issue #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on issue #11472:
URL: https://github.com/apache/beam/pull/11472#issuecomment-618413989


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11472: [BEAM-2939, BEAM-5602] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r414022861



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##########
@@ -311,9 +315,14 @@ private void createRunnerAndConsumersForPTransformRecursively(
       // Get finish bundle Execution Time Metrics.
       response.addAllMonitoringInfos(
           bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Extract all other MonitoringInfos other than the execution time monitoring infos.
+      // Extract MonitoringInfos that come from the metrics container registry.
       response.addAllMonitoringInfos(
           bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
+      // Add any additional monitoring infos that the "runners" report explicitly.
+      for (ProgressRequestCallback progressRequestCallback :
+          bundleProcessor.getProgressRequestCallbacks()) {
+        response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());

Review comment:
       When a bundle finished, will the `progress.workRemaining` always be 0?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #11472: [BEAM-2939] Migrate from HasSize to HasProgress interface for restriction trackers and use the progress value during sizing, splitting and reporting

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11472:
URL: https://github.com/apache/beam/pull/11472#discussion_r413971768



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -397,12 +396,14 @@ public WatermarkEstimatorStateT getState() {
   }
 
   public static class DefaultGetSize {
-    /** Uses {@link Sizes.HasSize} to produce the size. */
+    /** Uses {@link HasProgress} to produce the size. */
     @SuppressWarnings("unused")
     public static <InputT, OutputT> double invokeGetSize(
         DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
-      if (argumentProvider.restrictionTracker() instanceof HasSize) {
-        return ((HasSize) argumentProvider.restrictionTracker()).getSize();
+      if (argumentProvider.restrictionTracker() instanceof HasProgress) {

Review comment:
       Thanks for explaining this! If so, do we still need to keep `GetSize`? It seems like highly overlap with `HasProgress`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org