You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/18 01:25:19 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.

boyuanzz commented on a change in pull request #11448: [BEAM-3836] Enable dynamic splitting/checkpointing within the Java SDK harness.
URL: https://github.com/apache/beam/pull/11448#discussion_r410528812
 
 

 ##########
 File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 ##########
 @@ -1246,16 +1249,230 @@ public void process(ProcessContext c) {
               StateRequestHandler.unsupported(),
               BundleProgressHandler.ignored())) {
         Iterables.getOnlyElement(bundle.getInputReceivers().values())
-            .accept(
-                WindowedValue.valueInGlobalWindow(
-                    CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+            .accept(valueInGlobalWindow(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
       }
     }
     assertThat(
         outputValues,
         containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(KV.of("stream1X", "")),
-            WindowedValue.valueInGlobalWindow(KV.of("stream2X", ""))));
+            valueInGlobalWindow(KV.of("stream1X", "")),
+            valueInGlobalWindow(KV.of("stream2X", ""))));
+  }
+
+  /**
+   * A restriction tracker that will block making progress on {@link #WAIT_TILL_SPLIT} until a try
+   * split is invoked.
+   */
+  private static class WaitingTillSplitRestrictionTracker extends RestrictionTracker<String, Void> {
+    private static final String WAIT_TILL_SPLIT = "WaitTillSplit";
+    private static final String PRIMARY = "Primary";
+    private static final String RESIDUAL = "Residual";
+
+    private String currentRestriction;
+
+    private WaitingTillSplitRestrictionTracker(String restriction) {
+      this.currentRestriction = restriction;
+    }
+
+    @Override
+    public boolean tryClaim(Void position) {
+      return needsSplitting();
+    }
+
+    @Override
+    public String currentRestriction() {
+      return currentRestriction;
+    }
+
+    @Override
+    public SplitResult<String> trySplit(double fractionOfRemainder) {
+      if (!needsSplitting()) {
+        return null;
+      }
+      this.currentRestriction = PRIMARY;
+      return SplitResult.of(currentRestriction, RESIDUAL);
+    }
+
+    private boolean needsSplitting() {
+      return WAIT_TILL_SPLIT.equals(currentRestriction);
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      checkState(!needsSplitting(), "Expected for this restriction to have been split.");
+    }
+  }
+
+  @Test(timeout = 60000L)
+  public void testSplit() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("impulse", Impulse.create())
+        .apply(
+            "create",
+            ParDo.of(
+                new DoFn<byte[], String>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctxt) {
+                    ctxt.output("zero");
+                    ctxt.output(WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT);
+                    ctxt.output("two");
+                  }
+                }))
+        .apply(
+            "forceSplit",
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @GetInitialRestriction
+                  public String getInitialRestriction(@Element String element) {
+                    return element;
+                  }
+
+                  @NewTracker
+                  public WaitingTillSplitRestrictionTracker newTracker(
+                      @Restriction String restriction) {
+                    return new WaitingTillSplitRestrictionTracker(restriction);
+                  }
+
+                  @ProcessElement
+                  public void process(
+                      RestrictionTracker<String, Void> tracker, ProcessContext context) {
+                    while (tracker.tryClaim(null)) {}
+                    context.output(tracker.currentRestriction());
+                  }
+                }))
+        .apply("addKeys", WithKeys.of("foo"))
+        // Use some unknown coders
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        // Force the output to be materialized
+        .apply("gbk", GroupByKey.create());
+
+    RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
+    // Expand any splittable DoFns within the graph to enable sizing and splitting of bundles.
+    RunnerApi.Pipeline pipelineWithSdfExpanded =
+        ProtoOverrides.updateTransform(
+            PTransformTranslation.PAR_DO_TRANSFORM_URN,
+            pipeline,
+            SplittableParDoExpander.createSizedReplacement());
+    FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineWithSdfExpanded);
+
+    // Find the fused stage with the SDF ProcessSizedElementAndRestriction transform
+    Optional<ExecutableStage> optionalStage =
+        Iterables.tryFind(
+            fused.getFusedStages(),
+            (ExecutableStage stage) ->
+                Iterables.filter(
+                        stage.getTransforms(),
+                        (PTransformNode node) ->
+                            PTransformTranslation
+                                .SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+                                .equals(node.getTransform().getSpec().getUrn()))
+                    .iterator()
+                    .hasNext());
+    checkState(
+        optionalStage.isPresent(), "Expected a stage with SDF ProcessSizedElementAndRestriction.");
+    ExecutableStage stage = optionalStage.get();
+
+    ExecutableProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptors.fromExecutableStage(
+            "my_stage", stage, dataServer.getApiServiceDescriptor());
+
+    BundleProcessor processor =
+        controlClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(), descriptor.getRemoteInputDestinations());
+    Map<String, ? super Coder<WindowedValue<?>>> remoteOutputCoders =
+        descriptor.getRemoteOutputCoders();
+    Map<String, Collection<? super WindowedValue<?>>> outputValues = new HashMap<>();
+    Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    for (Entry<String, ? super Coder<WindowedValue<?>>> remoteOutputCoder :
+        remoteOutputCoders.entrySet()) {
+      List<? super WindowedValue<?>> outputContents =
+          Collections.synchronizedList(new ArrayList<>());
+      outputValues.put(remoteOutputCoder.getKey(), outputContents);
+      outputReceivers.put(
+          remoteOutputCoder.getKey(),
+          RemoteOutputReceiver.of(
+              (Coder) remoteOutputCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>) outputContents::add));
+    }
+
+    List<ProcessBundleSplitResponse> splitResponses = new ArrayList<>();
+    List<ProcessBundleResponse> checkpointResponses = new ArrayList<>();
+    List<String> requestsFinalization = new ArrayList<>();
+
+    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    ScheduledFuture<Object> future;
+
+    // Execute the remote bundle.
+    try (RemoteBundle bundle =
+        processor.newBundle(
+            outputReceivers,
+            Collections.emptyMap(),
+            StateRequestHandler.unsupported(),
+            BundleProgressHandler.ignored(),
+            splitResponses::add,
+            checkpointResponses::add,
+            requestsFinalization::add)) {
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("zero")));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(
+              valueInGlobalWindow(
+                  sdfSizedElementAndRestrictionForTest(
+                      WaitingTillSplitRestrictionTracker.WAIT_TILL_SPLIT)));
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(valueInGlobalWindow(sdfSizedElementAndRestrictionForTest("two")));
+      // Keep sending splits until the bundle terminates, we specifically use 0.5 so that we will
+      // choose a split point before the end of WAIT_TILL_SPLIT regardless of where we are during
+      // processing.
+      future =
+          (ScheduledFuture)
+              executor.scheduleWithFixedDelay(
+                  () -> bundle.split(0.5), 0L, 100L, TimeUnit.MILLISECONDS);
+    }
+    future.cancel(false);
+    executor.shutdown();
+
+    assertTrue(requestsFinalization.isEmpty());
+    assertTrue(checkpointResponses.isEmpty());
+
+    List<WindowedValue<KV<String, String>>> expectedOutputs = new ArrayList<>();
+
+    // We only validate the last split response since it is the only one that could possibly
+    // contain the SDF split, all others will be a reduction in the ChannelSplit
+    assertFalse(splitResponses.isEmpty());
+    ProcessBundleSplitResponse splitResponse = splitResponses.get(splitResponses.size() - 1);
+    ChannelSplit channelSplit = Iterables.getOnlyElement(splitResponse.getChannelSplitsList());
+
+    // There are only a few outcomes that could happen with splitting due to timing:
 
 Review comment:
   Can we force a split at element by only feeding in `WAIT_TILL_SPLIT `?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services