You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/25 22:28:17 UTC

[GitHub] [beam] lukecwik opened a new pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution.

lukecwik opened a new pull request #12093:
URL: https://github.com/apache/beam/pull/12093


   This covers all but the splittable DoFn processElements call since I wanted to limit the size of the change.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12093:
URL: https://github.com/apache/beam/pull/12093#issuecomment-649862731






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12093:
URL: https://github.com/apache/beam/pull/12093#discussion_r446418399



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -637,6 +737,28 @@ private void processElementForParDo(WindowedValue<InputT> elem) {
   }
 
   private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
+    currentElement = elem;
+    try {
+      currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
+      outputTo(
+          mainOutputConsumers,
+          (WindowedValue)
+              elem.withValue(
+                  KV.of(
+                      elem.getValue(),
+                      KV.of(
+                          currentRestriction,
+                          doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext)))));
+    } finally {
+      currentElement = null;
+      currentRestriction = null;

Review comment:
       I noticed currentWindow isn't set to null here, where it was before. Same with processElementForWindowObservingPairWithRestriction. Why is that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12093:
URL: https://github.com/apache/beam/pull/12093#discussion_r447119477



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,60 +438,201 @@ public void accept(WindowedValue input) throws Exception {
     this.doFnInvoker.invokeSetup();
 
     this.startBundleArgumentProvider = new StartBundleArgumentProvider();
+    // Register the appropriate handlers.
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        startFunctionRegistry.register(pTransformId, this::startBundle);
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        // startBundle should not be invoked
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        // startBundle should not be invoked
+      default:
+        // no-op
+    }
+
+    String mainInput;
+    try {
+      mainInput = ParDoTranslation.getMainInputName(pTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    final FnDataReceiver<WindowedValue> mainInputConsumer;
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+        if (doFnSignature.processElement().observesWindow() || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingParDo;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForParDo;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        if (doFnSignature.getInitialRestriction().observesWindow()
+            || (doFnSignature.getInitialWatermarkEstimatorState() != null
+                && doFnSignature.getInitialWatermarkEstimatorState().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingPairWithRestriction;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForPairWithRestriction;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        if ((doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new WindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentWindow,
+                              currentElement.getPane()));
+                }
+              };
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new NonWindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentElement.getWindows(),
+                              currentElement.getPane()));
+                }
+              };
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        if (doFnSignature.processElement().observesWindow()
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || (doFnSignature.newWatermarkEstimator() != null
+                && doFnSignature.newWatermarkEstimator().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  // TODO(BEAM-10303): Create a variant which is optimized to not observe the
+                  // windows.
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
+    }
+    pCollectionConsumerRegistry.register(
+        pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
+
     switch (pTransform.getSpec().getUrn()) {

Review comment:
       Thanks, I'll remove it in the second half of this change which updates SDFs to have the non window observing optimization as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12093:
URL: https://github.com/apache/beam/pull/12093#discussion_r446447782



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -637,6 +737,28 @@ private void processElementForParDo(WindowedValue<InputT> elem) {
   }
 
   private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
+    currentElement = elem;
+    try {
+      currentRestriction = doFnInvoker.invokeGetInitialRestriction(processContext);
+      outputTo(
+          mainOutputConsumers,
+          (WindowedValue)
+              elem.withValue(
+                  KV.of(
+                      elem.getValue(),
+                      KV.of(
+                          currentRestriction,
+                          doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext)))));
+    } finally {
+      currentElement = null;
+      currentRestriction = null;

Review comment:
       It is never set/utilized in these methods and just remains null. All output is produced using the entire set of windows for the current element instead.
   
   The window observing versions of these methods do set it and clear it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12093:
URL: https://github.com/apache/beam/pull/12093#issuecomment-649850539






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12093:
URL: https://github.com/apache/beam/pull/12093#issuecomment-649849369


   R: @ibzib 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12093:
URL: https://github.com/apache/beam/pull/12093#discussion_r446475260



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,60 +438,201 @@ public void accept(WindowedValue input) throws Exception {
     this.doFnInvoker.invokeSetup();
 
     this.startBundleArgumentProvider = new StartBundleArgumentProvider();
+    // Register the appropriate handlers.
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        startFunctionRegistry.register(pTransformId, this::startBundle);
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        // startBundle should not be invoked
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        // startBundle should not be invoked
+      default:
+        // no-op
+    }
+
+    String mainInput;
+    try {
+      mainInput = ParDoTranslation.getMainInputName(pTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    final FnDataReceiver<WindowedValue> mainInputConsumer;
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+        if (doFnSignature.processElement().observesWindow() || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingParDo;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForParDo;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        if (doFnSignature.getInitialRestriction().observesWindow()
+            || (doFnSignature.getInitialWatermarkEstimatorState() != null
+                && doFnSignature.getInitialWatermarkEstimatorState().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingPairWithRestriction;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForPairWithRestriction;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        if ((doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new WindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentWindow,
+                              currentElement.getPane()));
+                }
+              };
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new NonWindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentElement.getWindows(),
+                              currentElement.getPane()));
+                }
+              };
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        if (doFnSignature.processElement().observesWindow()
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || (doFnSignature.newWatermarkEstimator() != null
+                && doFnSignature.newWatermarkEstimator().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  // TODO(BEAM-10303): Create a variant which is optimized to not observe the
+                  // windows.
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
+    }
+    pCollectionConsumerRegistry.register(
+        pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
+
     switch (pTransform.getSpec().getUrn()) {

Review comment:
       It seems like we should remove this duplicate part.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12093:
URL: https://github.com/apache/beam/pull/12093


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12093:
URL: https://github.com/apache/beam/pull/12093#issuecomment-649863134


   CC: @reuvenlax 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12093: [BEAM-10303] Add support for the non-window observing optimization to DoFn execution in portable Beam Java

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12093:
URL: https://github.com/apache/beam/pull/12093#discussion_r447119477



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,60 +438,201 @@ public void accept(WindowedValue input) throws Exception {
     this.doFnInvoker.invokeSetup();
 
     this.startBundleArgumentProvider = new StartBundleArgumentProvider();
+    // Register the appropriate handlers.
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        startFunctionRegistry.register(pTransformId, this::startBundle);
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        // startBundle should not be invoked
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        // startBundle should not be invoked
+      default:
+        // no-op
+    }
+
+    String mainInput;
+    try {
+      mainInput = ParDoTranslation.getMainInputName(pTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    final FnDataReceiver<WindowedValue> mainInputConsumer;
+    switch (pTransform.getSpec().getUrn()) {
+      case PTransformTranslation.PAR_DO_TRANSFORM_URN:
+        if (doFnSignature.processElement().observesWindow() || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingParDo;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForParDo;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN:
+        if (doFnSignature.getInitialRestriction().observesWindow()
+            || (doFnSignature.getInitialWatermarkEstimatorState() != null
+                && doFnSignature.getInitialWatermarkEstimatorState().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingPairWithRestriction;
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer = this::processElementForPairWithRestriction;
+          this.processContext = new NonWindowObservingProcessBundleContext();
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
+        if ((doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new WindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentWindow,
+                              currentElement.getPane()));
+                }
+              };
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new NonWindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
+                                  + "/GetSize") {
+                            @Override
+                            public Object restriction() {
+                              return output;
+                            }
+
+                            @Override
+                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                              return timestamp;
+                            }
+
+                            @Override
+                            public RestrictionTracker<?, ?> restrictionTracker() {
+                              return doFnInvoker.invokeNewTracker(this);
+                            }
+                          });
+
+                  outputTo(
+                      mainOutputConsumers,
+                      (WindowedValue<OutputT>)
+                          WindowedValue.of(
+                              KV.of(
+                                  KV.of(
+                                      currentElement.getValue(),
+                                      KV.of(output, currentWatermarkEstimatorState)),
+                                  size),
+                              timestamp,
+                              currentElement.getWindows(),
+                              currentElement.getPane()));
+                }
+              };
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN:
+        if (doFnSignature.processElement().observesWindow()
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || (doFnSignature.newWatermarkEstimator() != null
+                && doFnSignature.newWatermarkEstimator().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  // TODO(BEAM-10303): Create a variant which is optimized to not observe the
+                  // windows.
+                  processElementForWindowObservingSizedElementAndRestriction(input);
+                }
+              };
+          this.processContext = new WindowObservingProcessBundleContext();
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
+    }
+    pCollectionConsumerRegistry.register(
+        pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
+
     switch (pTransform.getSpec().getUrn()) {

Review comment:
       Thanks, I'll remove it in a follow-up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org