You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/06 22:29:56 UTC

[GitHub] [beam] lukecwik opened a new pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

lukecwik opened a new pull request #12488:
URL: https://github.com/apache/beam/pull/12488


   This is towards making all UnboundedSources execute as splittable dofns within the direct runner using the SDF unbounded source wrapper since it relies on bundle finalization to handle checkpoints.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467358405



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
##########
@@ -183,6 +188,15 @@ public void initialize(
         committedResult.getUnprocessedInputs().orElse(null),
         committedResult.getOutputs(),
         result.getWatermarkHold());
+
+    // Callback and requested bundle finalizations
+    for (InMemoryBundleFinalizer.Finalization finalization : result.getBundleFinalizations()) {
+      try {
+        finalization.getCallback().onBundleSuccess();
+      } catch (Exception e) {

Review comment:
       This exception is to catch user thrown exceptions since the bundle finalizing callback is user written code this is why its so wide and it can't be narrowed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467313292



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);
+      }
+    }
+
+    private static class BasicBundleFinalizingDoFn extends BundleFinalizingDoFn {
+      @ProcessElement
+      public void processElement(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        testFinalization(bundleFinalizer, output);
+      }
+    }
+
+    private static class BundleFinalizerOutputChecker
+        implements SerializableFunction<Iterable<String>, Void> {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertTrue(
+            "Expected to have received one callback enabling output to be produced but received none.",

Review comment:
       In other words, I'd like to clarify the assertion we're making about `input` and what it means.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670724401






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670729248


   The test failure is known issue, failing in postcommit https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/7773/testReport/ and https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/7805/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467324258



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);
+      }
+    }
+
+    private static class BasicBundleFinalizingDoFn extends BundleFinalizingDoFn {
+      @ProcessElement
+      public void processElement(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        testFinalization(bundleFinalizer, output);
+      }
+    }
+
+    private static class BundleFinalizerOutputChecker
+        implements SerializableFunction<Iterable<String>, Void> {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertTrue(
+            "Expected to have received one callback enabling output to be produced but received none.",

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467307061



##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :
+        bundleFinalizer.getAndClearFinalizations()) {
+      finalization.getCallback().onBundleSuccess();

Review comment:
       Why do we need to trigger the callbacks here?

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
##########
@@ -183,6 +188,15 @@ public void initialize(
         committedResult.getUnprocessedInputs().orElse(null),
         committedResult.getOutputs(),
         result.getWatermarkHold());
+
+    // Callback and requested bundle finalizations
+    for (InMemoryBundleFinalizer.Finalization finalization : result.getBundleFinalizations()) {
+      try {
+        finalization.getCallback().onBundleSuccess();
+      } catch (Exception e) {
+        LOG.warn("Failed to finalize requested bundle {}", finalization, e);

Review comment:
       I don't think logging only the finalization tells us enough. Can we log more information about the bundle itself?

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :

Review comment:
       Can we make any assertions about the finalizations? ie Can we expect that they're not empty?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);
+      }
+    }
+
+    private static class BasicBundleFinalizingDoFn extends BundleFinalizingDoFn {
+      @ProcessElement
+      public void processElement(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        testFinalization(bundleFinalizer, output);
+      }
+    }
+
+    private static class BundleFinalizerOutputChecker
+        implements SerializableFunction<Iterable<String>, Void> {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertTrue(
+            "Expected to have received one callback enabling output to be produced but received none.",

Review comment:
       Multiple callbacks are allowed, right? (If so, please reword this as "at least one callback.")

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
##########
@@ -81,6 +89,22 @@ public DirectTimerInternals timerInternals() {
       return timerInternals;
     }
 
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      if (bundleFinalizer == null) {
+        bundleFinalizer = new InMemoryBundleFinalizer();

Review comment:
       Why lazy-initialize? Is this just to save memory?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670785654






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467331551



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);

Review comment:
       nit: I am not sure if there are other ways, but such `sleep` might be a source of flakiness. E.g. prior callbacks not finish after 100L.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670223324


   R: @amaliujia @ibzib 
   CC: @boyuanzz This should enable testing unbounded SDFs using the direct runner with bundle finalization callbacks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670758669


   Thanks @lukecwik 
   
   Will take a look at this PR soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670608589






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467331678



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
##########
@@ -839,19 +845,20 @@ public ProcessContinuation process(
         RestrictionTracker<OffsetRange, Long> tracker,
         BundleFinalizer bundleFinalizer)
         throws InterruptedException {
-      if (wasFinalized.get()) {
+      if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+        tracker.tryClaim(tracker.currentRestriction().getFrom() + 1);
+        receiver.output(element);
         // Claim beyond the end now that we know we have been finalized.
         tracker.tryClaim(Long.MAX_VALUE);
-        receiver.output(element);
         return stop();
       }
       if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {
         bundleFinalizer.afterBundleCommit(
             Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
-            () -> wasFinalized.set(true));
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
         // We sleep here instead of setting a resume time since the resume time doesn't need to
         // be honored.
-        sleep(1000L); // 1 second
+        sleep(100L);

Review comment:
       nit: same here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12488:
URL: https://github.com/apache/beam/pull/12488


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670784514


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467330787



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
##########
@@ -183,6 +188,15 @@ public void initialize(
         committedResult.getUnprocessedInputs().orElse(null),
         committedResult.getOutputs(),
         result.getWatermarkHold());
+
+    // Callback and requested bundle finalizations
+    for (InMemoryBundleFinalizer.Finalization finalization : result.getBundleFinalizations()) {
+      try {
+        finalization.getCallback().onBundleSuccess();
+      } catch (Exception e) {

Review comment:
       Is is possible to make this Exception more specific? Or that does not add more values?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467324202



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
##########
@@ -81,6 +89,22 @@ public DirectTimerInternals timerInternals() {
       return timerInternals;
     }
 
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      if (bundleFinalizer == null) {
+        bundleFinalizer = new InMemoryBundleFinalizer();

Review comment:
       yup since most transforms won't use the bundle finalizer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670609180


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670746769


   Expanded existing JIRA: https://issues.apache.org/jira/browse/BEAM-8460, opened #12503 to disable failing test category for spark and flink


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670609229


   Run Flink ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670224938


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670246745


   Run Java Precommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467358540



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
##########
@@ -839,19 +845,20 @@ public ProcessContinuation process(
         RestrictionTracker<OffsetRange, Long> tracker,
         BundleFinalizer bundleFinalizer)
         throws InterruptedException {
-      if (wasFinalized.get()) {
+      if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+        tracker.tryClaim(tracker.currentRestriction().getFrom() + 1);
+        receiver.output(element);
         // Claim beyond the end now that we know we have been finalized.
         tracker.tryClaim(Long.MAX_VALUE);
-        receiver.output(element);
         return stop();
       }
       if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {
         bundleFinalizer.afterBundleCommit(
             Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
-            () -> wasFinalized.set(true));
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
         // We sleep here instead of setting a resume time since the resume time doesn't need to
         // be honored.
-        sleep(1000L); // 1 second
+        sleep(100L);

Review comment:
       We also have an effectively 300 second timeout here so I don't believe it should flake but if it does I'll have to figure out a different way to design this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670811848


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467358540



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
##########
@@ -839,19 +845,20 @@ public ProcessContinuation process(
         RestrictionTracker<OffsetRange, Long> tracker,
         BundleFinalizer bundleFinalizer)
         throws InterruptedException {
-      if (wasFinalized.get()) {
+      if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+        tracker.tryClaim(tracker.currentRestriction().getFrom() + 1);
+        receiver.output(element);
         // Claim beyond the end now that we know we have been finalized.
         tracker.tryClaim(Long.MAX_VALUE);
-        receiver.output(element);
         return stop();
       }
       if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {
         bundleFinalizer.afterBundleCommit(
             Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
-            () -> wasFinalized.set(true));
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
         // We sleep here instead of setting a resume time since the resume time doesn't need to
         // be honored.
-        sleep(1000L); // 1 second
+        sleep(100L);

Review comment:
       MAX_ATTEMPTS * timeout >= 300 seconds. I don't believe it should flake but if it does I'll have to figure out a different way to design this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467326841



##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :

Review comment:
       Not relevant anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467358516



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);

Review comment:
       MAX_ATTEMPTS * timeout >= 300 seconds. I don't believe it should flake but if it does I'll have to figure out a different way to design this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik removed a comment on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik removed a comment on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670246745






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670784538


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670608102


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467330599



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
##########
@@ -81,6 +89,22 @@ public DirectTimerInternals timerInternals() {
       return timerInternals;
     }
 
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      if (bundleFinalizer == null) {
+        bundleFinalizer = new InMemoryBundleFinalizer();

Review comment:
       Line#37 says the implementation is not thread-safe. So I assume there is no need to consider concurrent access to this initialization, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670612443


   Run Spark ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467326804



##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :
+        bundleFinalizer.getAndClearFinalizations()) {
+      finalization.getCallback().onBundleSuccess();

Review comment:
       We don't. I originally thought we did for some reason since I was changing this code together. I removed it and instead throw if it is ever accessed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467328411



##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
##########
@@ -183,6 +188,15 @@ public void initialize(
         committedResult.getUnprocessedInputs().orElse(null),
         committedResult.getOutputs(),
         result.getWatermarkHold());
+
+    // Callback and requested bundle finalizations
+    for (InMemoryBundleFinalizer.Finalization finalization : result.getBundleFinalizations()) {
+      try {
+        finalization.getCallback().onBundleSuccess();
+      } catch (Exception e) {
+        LOG.warn("Failed to finalize requested bundle {}", finalization, e);

Review comment:
       Added the completed bundle to the output.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12488:
URL: https://github.com/apache/beam/pull/12488#issuecomment-670306369


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org