You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/07 22:30:29 UTC

[GitHub] [beam] ibzib commented on a change in pull request #12488: [BEAM-10656] Enable bundle finalization within the Java direct runner.

ibzib commented on a change in pull request #12488:
URL: https://github.com/apache/beam/pull/12488#discussion_r467307061



##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :
+        bundleFinalizer.getAndClearFinalizations()) {
+      finalization.getCallback().onBundleSuccess();

Review comment:
       Why do we need to trigger the callbacks here?

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
##########
@@ -183,6 +188,15 @@ public void initialize(
         committedResult.getUnprocessedInputs().orElse(null),
         committedResult.getOutputs(),
         result.getWatermarkHold());
+
+    // Callback and requested bundle finalizations
+    for (InMemoryBundleFinalizer.Finalization finalization : result.getBundleFinalizations()) {
+      try {
+        finalization.getCallback().onBundleSuccess();
+      } catch (Exception e) {
+        LOG.warn("Failed to finalize requested bundle {}", finalization, e);

Review comment:
       I don't think logging only the finalization tells us enough. Can we log more information about the bundle itself?

##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
##########
@@ -126,23 +128,30 @@ public void outputWindowedValue(
             NullSideInputReader.empty(),
             Executors.newSingleThreadScheduledExecutor(),
             1000,
-            Duration.standardSeconds(3));
-
-    return invoker.invokeProcessElement(
-        DoFnInvokers.invokerFor(fn),
-        WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
-        new OffsetRangeTracker(initialRestriction),
-        new WatermarkEstimator<Void>() {
-          @Override
-          public Instant currentWatermark() {
-            return GlobalWindow.TIMESTAMP_MIN_VALUE;
-          }
+            Duration.standardSeconds(3),
+            () -> bundleFinalizer);
+
+    SplittableProcessElementInvoker.Result rval =
+        invoker.invokeProcessElement(
+            DoFnInvokers.invokerFor(fn),
+            WindowedValue.of(null, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING),
+            new OffsetRangeTracker(initialRestriction),
+            new WatermarkEstimator<Void>() {
+              @Override
+              public Instant currentWatermark() {
+                return GlobalWindow.TIMESTAMP_MIN_VALUE;
+              }
 
-          @Override
-          public Void getState() {
-            return null;
-          }
-        });
+              @Override
+              public Void getState() {
+                return null;
+              }
+            });
+    for (InMemoryBundleFinalizer.Finalization finalization :

Review comment:
       Can we make any assertions about the finalizations? ie Can we expect that they're not empty?

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1499,6 +1504,146 @@ public void populateDisplayData(Builder builder) {
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class BundleFinalizationTests extends SharedTestBase implements Serializable {
+    private abstract static class BundleFinalizingDoFn extends DoFn<KV<String, Long>, String> {
+      private static final long MAX_ATTEMPTS = 3000;
+      // We use the UUID to uniquely identify this DoFn in case this test is run with
+      // other tests in the same JVM.
+      private static final Map<UUID, AtomicBoolean> WAS_FINALIZED = new HashMap();
+      private final UUID uuid = UUID.randomUUID();
+
+      public void testFinalization(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        if (WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).get()) {
+          output.output("bundle was finalized");
+          return;
+        }
+        bundleFinalizer.afterBundleCommit(
+            Instant.now().plus(Duration.standardSeconds(MAX_ATTEMPTS)),
+            () -> WAS_FINALIZED.computeIfAbsent(uuid, (unused) -> new AtomicBoolean()).set(true));
+        // We sleep here to give time for the runner to perform any prior callbacks.
+        sleep(100L);
+      }
+    }
+
+    private static class BasicBundleFinalizingDoFn extends BundleFinalizingDoFn {
+      @ProcessElement
+      public void processElement(BundleFinalizer bundleFinalizer, OutputReceiver<String> output)
+          throws Exception {
+        testFinalization(bundleFinalizer, output);
+      }
+    }
+
+    private static class BundleFinalizerOutputChecker
+        implements SerializableFunction<Iterable<String>, Void> {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertTrue(
+            "Expected to have received one callback enabling output to be produced but received none.",

Review comment:
       Multiple callbacks are allowed, right? (If so, please reword this as "at least one callback.")

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java
##########
@@ -81,6 +89,22 @@ public DirectTimerInternals timerInternals() {
       return timerInternals;
     }
 
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      if (bundleFinalizer == null) {
+        bundleFinalizer = new InMemoryBundleFinalizer();

Review comment:
       Why lazy-initialize? Is this just to save memory?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org