You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/16 23:48:28 UTC

[GitHub] [beam] boyuanzz opened a new pull request #12287: [BEAM-10341] Support drain in Java SDK

boyuanzz opened a new pull request #12287:
URL: https://github.com/apache/beam/pull/12287


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-660130566


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-660215487


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-659835411


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-660167706


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12287:
URL: https://github.com/apache/beam/pull/12287#discussion_r456196686



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -69,6 +69,31 @@ public static TransformReplacement createSizedReplacement() {
     return SizedReplacement.INSTANCE;
   }
 
+  /**
+   * Returns a transform replacement in drain mode which expands a splittable ParDo from:
+   *
+   * <pre>{@code
+   * sideInputA ---------\
+   * sideInputB ---------V
+   * mainInput ---> SplittableParDo --> outputA
+   *                                \-> outputB
+   * }</pre>
+   *
+   * into:
+   *
+   * <pre>{@code
+   * sideInputA ---------\---------------------\----------------------\--------------------------\
+   * sideInputB ---------V---------------------V----------------------V--------------------------V
+   * mainInput ---> PairWithRestriction --> SplitAndSize --> TruncateAndSize --> ProcessSizedElementsAndRestriction --> outputA
+   *                                                                                                                \-> outputB
+   * }</pre>
+   *
+   * .

Review comment:
       This required by checkstyleMain: The first sentence should be ended with a period.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-659753444


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12287:
URL: https://github.com/apache/beam/pull/12287#discussion_r456154654



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,77 @@
             || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
-                @Override
-                public void outputWithTimestamp(OutputT output, Instant timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentWindow,
-                              currentElement.getPane()));
-                }
-              };
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
         } else {
           mainInputConsumer = this::processElementForSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new NonWindowObservingProcessBundleContext() {
-                @Override
-                public void outputWithTimestamp(OutputT output, Instant timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentElement.getWindows(),
-                              currentElement.getPane()));
-                }
-              };
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          if (Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {

Review comment:
       This should only handle splits if the mainOutputConsumers has only one value.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,77 @@
             || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
-                @Override
-                public void outputWithTimestamp(OutputT output, Instant timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentWindow,
-                              currentElement.getPane()));
-                }
-              };
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
         } else {
           mainInputConsumer = this::processElementForSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new NonWindowObservingProcessBundleContext() {
-                @Override
-                public void outputWithTimestamp(OutputT output, Instant timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentElement.getWindows(),
-                              currentElement.getPane()));
-                }
-              };
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          if (Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
+            mainInputConsumer =
+                new SplittableFnDataReceiver() {
+                  @Override
+                  public void accept(WindowedValue input) throws Exception {
+                    processElementForWindowObservingTruncateRestriction(input);
+                  }
+
+                  // TODO(BEAM-10303): Split should work with window observing optimization.
+                  @Override
+                  public SplitResult trySplit(double fractionOfRemainder) {
+                    return null;
+                  }
+
+                  // TODO(BEAM-10303): Progress should work with window observing optimization.
+                  @Override
+                  public double getProgress() {
+                    return 0;
+                  }
+                };
+          } else {
+            mainInputConsumer = this::processElementForWindowObservingTruncateRestriction;
+          }
+          this.processContext =
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
+        } else {
+          if (Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {

Review comment:
       ditto for handling splits if there is only one output consumer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-659783451






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #12287:
URL: https://github.com/apache/beam/pull/12287


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12287:
URL: https://github.com/apache/beam/pull/12287#discussion_r456193528



##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -69,6 +69,31 @@ public static TransformReplacement createSizedReplacement() {
     return SizedReplacement.INSTANCE;
   }
 
+  /**
+   * Returns a transform replacement in drain mode which expands a splittable ParDo from:
+   *
+   * <pre>{@code
+   * sideInputA ---------\
+   * sideInputB ---------V
+   * mainInput ---> SplittableParDo --> outputA
+   *                                \-> outputB
+   * }</pre>
+   *
+   * into:
+   *
+   * <pre>{@code
+   * sideInputA ---------\---------------------\----------------------\--------------------------\
+   * sideInputB ---------V---------------------V----------------------V--------------------------V
+   * mainInput ---> PairWithRestriction --> SplitAndSize --> TruncateAndSize --> ProcessSizedElementsAndRestriction --> outputA
+   *                                                                                                                \-> outputB
+   * }</pre>
+   *
+   * .

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-660250360


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12287: [BEAM-10341] Support drain in Java SDK

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12287:
URL: https://github.com/apache/beam/pull/12287#issuecomment-659854733


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org