You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/17 18:50:35 UTC

[GitHub] [beam] lukecwik opened a new pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

lukecwik opened a new pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448
 
 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#issuecomment-615407301
 
 
   R: @boyuanzz @ihji 
   CC: @youngoli 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#discussion_r410556926
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 ##########
 @@ -1246,16 +1249,230 @@ public void process(ProcessContext c) {
               StateRequestHandler.unsupported(),
               BundleProgressHandler.ignored())) {
         Iterables.getOnlyElement(bundle.getInputReceivers().values())
-            .accept(
-                WindowedValue.valueInGlobalWindow(
-                    CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+            .accept(valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
       }
     }
     assertThat(
         outputValues,
         containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("stream1X", "")),
-            WindowedValue.valueInGlobalWindow(KV.of("stream2X", ""))));
+            valueInGlobalWindow(KV.of("stream1X", "")),
+            valueInGlobalWindow(KV.of("stream2X", ""))));
+  }
+
+  /**
+   * A restriction tracker that will block making progress on {@link #WAIT_TILL_SPLIT} until a try
+   * split is invoked.
+   */
+  private static class WaitingTillSplitRestrictionTracker extends RestrictionTracker<String, Void> {
+    private static final String WAIT_TILL_SPLIT = "WaitTillSplit";
+    private static final String PRIMARY = "Primary";
+    private static final String RESIDUAL = "Residual";
+
+    private String currentRestriction;
+
+    private WaitingTillSplitRestrictionTracker(String restriction) {
+      this.currentRestriction = restriction;
+    }
+
+    @Override
+    public boolean tryClaim(Void position) {
+      return needsSplitting();
+    }
+
+    @Override
+    public String currentRestriction() {
+      return currentRestriction;
+    }
+
+    @Override
+    public SplitResult<String> trySplit(double fractionOfRemainder) {
+      if (!needsSplitting()) {
+        return null;
+      }
+      this.currentRestriction = PRIMARY;
+      return SplitResult.of(currentRestriction, RESIDUAL);
+    }
+
+    private boolean needsSplitting() {
+      return WAIT_TILL_SPLIT.equals(currentRestriction);
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      checkState(!needsSplitting(), "Expected for this restriction to have been split.");
+    }
+  }
+
+  @Test(timeout = 60000L)
+  public void testSplit() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("impulse", Impulse.create())
+        .apply(
+            "create",
+            ParDo.of(
+                new DoFn<byte[], String>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctxt) {
+                    ctxt.output("zero");
+                    ctxt.output(WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT);
+                    ctxt.output("two");
+                  }
+                }))
+        .apply(
+            "forceSplit",
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @GetInitialRestriction
+                  public String getInitialRestriction(@Element String element) {
+                    return element;
+                  }
+
+                  @NewTracker
+                  public WaitingTillSplitRestrictionTracker newTracker(
+                      @Restriction String restriction) {
+                    return new WaitingTillSplitRestrictionTracker(restriction);
+                  }
+
+                  @ProcessElement
+                  public void process(
+                      RestrictionTracker<String, Void> tracker, ProcessContext context) {
+                    while (tracker.tryClaim(null)) {}
+                    context.output(tracker.currentRestriction());
+                  }
+                }))
+        .apply("addKeys", WithKeys.of("foo"))
+        // Use some unknown coders
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        // Force the output to be materialized
+        .apply("gbk", GroupByKey.create());
+
+    RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
+    // Expand any splittable DoFns within the graph to enable sizing and splitting of bundles.
+    RunnerApi.Pipeline pipelineWithSdfExpanded =
+        ProtoOverrides.updateTransform(
+            PTransformTranslation.PAR_DO_TRANSFORM_URN,
+            pipeline,
+            SplittableParDoExpander.createSizedReplacement());
+    FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineWithSdfExpanded);
+
+    // Find the fused stage with the SDF ProcessSizedElementAndRestriction transform
+    Optional<ExecutableStage> optionalStage =
+        Iterables.tryFind(
+            fused.getFusedStages(),
+            (ExecutableStage stage) ->
+                Iterables.filter(
+                        stage.getTransforms(),
+                        (PTransformNode node) ->
+                            PTransformTranslation
+                                .SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+                                .equals(node.getTransform().getSpec().getUrn()))
+                    .iterator()
+                    .hasNext());
+    checkState(
+        optionalStage.isPresent(), "Expected a stage with SDF ProcessSizedElementAndRestriction.");
+    ExecutableStage stage = optionalStage.get();
+
+    ExecutableProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptors.fromExecutableStage(
+            "my_stage", stage, dataServer.getApiServiceDescriptor());
+
+    BundleProcessor processor =
+        controlClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
+    Map<String, ? super Coder<WindowedValue<?>>> remoteOutputCoders =
+        descriptor.getRemoteOutputCoders();
+    Map<String, Collection<? super WindowedValue<?>>> outputValues = new HashMap<>();
+    Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    for (Entry<String, ? super Coder<WindowedValue<?>>> remoteOutputCoder :
+        remoteOutputCoders.entrySet()) {
+      List<? super WindowedValue<?>> outputContents =
+          Collections.synchronizedList(new ArrayList<>());
+      outputValues.put(remoteOutputCoder.getKey(), outputContents);
+      outputReceivers.put(
+          remoteOutputCoder.getKey(),
+          RemoteOutputReceiver.of(
+              (Coder) remoteOutputCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>) outputContents::add));
+    }
+
+    List<ProcessBundleSplitResponse> splitResponses = new ArrayList<>();
+    List<ProcessBundleResponse> checkpointResponses = new ArrayList<>();
+    List<String> requestsFinalization = new ArrayList<>();
+
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    ScheduledFuture<Object> future;
+
+    // Execute the remote bundle.
+    try (RemoteBundle bundle =
+        processor.newBundle(
+            outputReceivers,
+            Collections.emptyMap(),
+            StateRequestHandler.unsupported(),
+            BundleProgressHandler.ignored(),
+            splitResponses::add,
+            checkpointResponses::add,
+            requestsFinalization::add)) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("zero")));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(
+              valueInGlobalWindow(
+                  sdfSizedElementAndRestrictionForTest(
+                      WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT)));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("two")));
+      // Keep sending splits until the bundle terminates, we specifically use 0.5 so that we will
+      // choose a split point before the end of WAIT_TILL_SPLIT regardless of where we are during
+      // processing.
+      future =
+          (ScheduledFuture)
+              executor.scheduleWithFixedDelay(
+                  () -> bundle.split(0.5), 0L, 100L, TimeUnit.MILLISECONDS);
+    }
+    future.cancel(false);
+    executor.shutdown();
+
+    assertTrue(requestsFinalization.isEmpty());
+    assertTrue(checkpointResponses.isEmpty());
+
+    List<WindowedValue<KV<String, String>>> expectedOutputs = new ArrayList<>();
+
+    // We only validate the last split response since it is the only one that could possibly
+    // contain the SDF split, all others will be a reduction in the ChannelSplit
+    assertFalse(splitResponses.isEmpty());
+    ProcessBundleSplitResponse splitResponse = splitResponses.get(splitResponses.size() - 1);
+    ChannelSplit channelSplit = Iterables.getOnlyElement(splitResponse.getChannelSplitsList());
+
+    // There are only a few outcomes that could happen with splitting due to timing:
 
 Review comment:
   Thats a good idea. Updated the PR to reflect it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#discussion_r410528812
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 ##########
 @@ -1246,16 +1249,230 @@ public void process(ProcessContext c) {
               StateRequestHandler.unsupported(),
               BundleProgressHandler.ignored())) {
         Iterables.getOnlyElement(bundle.getInputReceivers().values())
-            .accept(
-                WindowedValue.valueInGlobalWindow(
-                    CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+            .accept(valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
       }
     }
     assertThat(
         outputValues,
         containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("stream1X", "")),
-            WindowedValue.valueInGlobalWindow(KV.of("stream2X", ""))));
+            valueInGlobalWindow(KV.of("stream1X", "")),
+            valueInGlobalWindow(KV.of("stream2X", ""))));
+  }
+
+  /**
+   * A restriction tracker that will block making progress on {@link #WAIT_TILL_SPLIT} until a try
+   * split is invoked.
+   */
+  private static class WaitingTillSplitRestrictionTracker extends RestrictionTracker<String, Void> {
+    private static final String WAIT_TILL_SPLIT = "WaitTillSplit";
+    private static final String PRIMARY = "Primary";
+    private static final String RESIDUAL = "Residual";
+
+    private String currentRestriction;
+
+    private WaitingTillSplitRestrictionTracker(String restriction) {
+      this.currentRestriction = restriction;
+    }
+
+    @Override
+    public boolean tryClaim(Void position) {
+      return needsSplitting();
+    }
+
+    @Override
+    public String currentRestriction() {
+      return currentRestriction;
+    }
+
+    @Override
+    public SplitResult<String> trySplit(double fractionOfRemainder) {
+      if (!needsSplitting()) {
+        return null;
+      }
+      this.currentRestriction = PRIMARY;
+      return SplitResult.of(currentRestriction, RESIDUAL);
+    }
+
+    private boolean needsSplitting() {
+      return WAIT_TILL_SPLIT.equals(currentRestriction);
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      checkState(!needsSplitting(), "Expected for this restriction to have been split.");
+    }
+  }
+
+  @Test(timeout = 60000L)
+  public void testSplit() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("impulse", Impulse.create())
+        .apply(
+            "create",
+            ParDo.of(
+                new DoFn<byte[], String>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctxt) {
+                    ctxt.output("zero");
+                    ctxt.output(WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT);
+                    ctxt.output("two");
+                  }
+                }))
+        .apply(
+            "forceSplit",
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @GetInitialRestriction
+                  public String getInitialRestriction(@Element String element) {
+                    return element;
+                  }
+
+                  @NewTracker
+                  public WaitingTillSplitRestrictionTracker newTracker(
+                      @Restriction String restriction) {
+                    return new WaitingTillSplitRestrictionTracker(restriction);
+                  }
+
+                  @ProcessElement
+                  public void process(
+                      RestrictionTracker<String, Void> tracker, ProcessContext context) {
+                    while (tracker.tryClaim(null)) {}
+                    context.output(tracker.currentRestriction());
+                  }
+                }))
+        .apply("addKeys", WithKeys.of("foo"))
+        // Use some unknown coders
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        // Force the output to be materialized
+        .apply("gbk", GroupByKey.create());
+
+    RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
+    // Expand any splittable DoFns within the graph to enable sizing and splitting of bundles.
+    RunnerApi.Pipeline pipelineWithSdfExpanded =
+        ProtoOverrides.updateTransform(
+            PTransformTranslation.PAR_DO_TRANSFORM_URN,
+            pipeline,
+            SplittableParDoExpander.createSizedReplacement());
+    FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineWithSdfExpanded);
+
+    // Find the fused stage with the SDF ProcessSizedElementAndRestriction transform
+    Optional<ExecutableStage> optionalStage =
+        Iterables.tryFind(
+            fused.getFusedStages(),
+            (ExecutableStage stage) ->
+                Iterables.filter(
+                        stage.getTransforms(),
+                        (PTransformNode node) ->
+                            PTransformTranslation
+                                .SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+                                .equals(node.getTransform().getSpec().getUrn()))
+                    .iterator()
+                    .hasNext());
+    checkState(
+        optionalStage.isPresent(), "Expected a stage with SDF ProcessSizedElementAndRestriction.");
+    ExecutableStage stage = optionalStage.get();
+
+    ExecutableProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptors.fromExecutableStage(
+            "my_stage", stage, dataServer.getApiServiceDescriptor());
+
+    BundleProcessor processor =
+        controlClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
+    Map<String, ? super Coder<WindowedValue<?>>> remoteOutputCoders =
+        descriptor.getRemoteOutputCoders();
+    Map<String, Collection<? super WindowedValue<?>>> outputValues = new HashMap<>();
+    Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    for (Entry<String, ? super Coder<WindowedValue<?>>> remoteOutputCoder :
+        remoteOutputCoders.entrySet()) {
+      List<? super WindowedValue<?>> outputContents =
+          Collections.synchronizedList(new ArrayList<>());
+      outputValues.put(remoteOutputCoder.getKey(), outputContents);
+      outputReceivers.put(
+          remoteOutputCoder.getKey(),
+          RemoteOutputReceiver.of(
+              (Coder) remoteOutputCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>) outputContents::add));
+    }
+
+    List<ProcessBundleSplitResponse> splitResponses = new ArrayList<>();
+    List<ProcessBundleResponse> checkpointResponses = new ArrayList<>();
+    List<String> requestsFinalization = new ArrayList<>();
+
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    ScheduledFuture<Object> future;
+
+    // Execute the remote bundle.
+    try (RemoteBundle bundle =
+        processor.newBundle(
+            outputReceivers,
+            Collections.emptyMap(),
+            StateRequestHandler.unsupported(),
+            BundleProgressHandler.ignored(),
+            splitResponses::add,
+            checkpointResponses::add,
+            requestsFinalization::add)) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("zero")));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(
+              valueInGlobalWindow(
+                  sdfSizedElementAndRestrictionForTest(
+                      WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT)));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("two")));
+      // Keep sending splits until the bundle terminates, we specifically use 0.5 so that we will
+      // choose a split point before the end of WAIT_TILL_SPLIT regardless of where we are during
+      // processing.
+      future =
+          (ScheduledFuture)
+              executor.scheduleWithFixedDelay(
+                  () -> bundle.split(0.5), 0L, 100L, TimeUnit.MILLISECONDS);
+    }
+    future.cancel(false);
+    executor.shutdown();
+
+    assertTrue(requestsFinalization.isEmpty());
+    assertTrue(checkpointResponses.isEmpty());
+
+    List<WindowedValue<KV<String, String>>> expectedOutputs = new ArrayList<>();
+
+    // We only validate the last split response since it is the only one that could possibly
+    // contain the SDF split, all others will be a reduction in the ChannelSplit
+    assertFalse(splitResponses.isEmpty());
+    ProcessBundleSplitResponse splitResponse = splitResponses.get(splitResponses.size() - 1);
+    ChannelSplit channelSplit = Iterables.getOnlyElement(splitResponse.getChannelSplitsList());
+
+    // There are only a few outcomes that could happen with splitting due to timing:
 
 Review comment:
   Can we force a split at element by only feeding in `WAIT_TILL_SPLIT `?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#discussion_r410553899
 
 

 ##########
 File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 ##########
 @@ -340,7 +347,12 @@ private void createRunnerAndConsumersForPTransformRecursively(
               "Unable to find active bundle for instruction id %s.",
               request.getProcessBundleSplit().getInstructionId()));
     }
-    throw new UnsupportedOperationException("TODO: BEAM-3836, support splitting within SDK.");
+    BeamFnApi.ProcessBundleSplitResponse.Builder response =
+        BeamFnApi.ProcessBundleSplitResponse.newBuilder();
+    for (BeamFnDataReadRunner channelRoot : bundleProcessor.getChannelRoots()) {
+      channelRoot.trySplit(request.getProcessBundleSplit(), response);
 
 Review comment:
   Yes, each root gets the full split request and extracts out the desiredsplit information for itself if there is any specific to it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#discussion_r410536424
 
 

 ##########
 File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 ##########
 @@ -340,7 +347,12 @@ private void createRunnerAndConsumersForPTransformRecursively(
               "Unable to find active bundle for instruction id %s.",
               request.getProcessBundleSplit().getInstructionId()));
     }
-    throw new UnsupportedOperationException("TODO: BEAM-3836, support splitting within SDK.");
+    BeamFnApi.ProcessBundleSplitResponse.Builder response =
+        BeamFnApi.ProcessBundleSplitResponse.newBuilder();
+    for (BeamFnDataReadRunner channelRoot : bundleProcessor.getChannelRoots()) {
+      channelRoot.trySplit(request.getProcessBundleSplit(), response);
 
 Review comment:
   Is it intended that all roots process the same split request?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services