You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/10/12 21:55:01 UTC

[beam] branch master updated: Add unsupported BundleFinalizationHandler to portable batch Flink.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b00b0b  Add unsupported BundleFinalizationHandler to portable batch Flink.
     new c0755cd  Merge pull request #13054 from [BEAM-11021] Add unsupported BundleFinalizationHandler to portable batch Flink.
6b00b0b is described below

commit 6b00b0bcd83dac89bf178cca3657e3d95305c5b5
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Oct 8 16:41:58 2020 -0700

    Add unsupported BundleFinalizationHandler to portable batch Flink.
---
 .../functions/FlinkExecutableStageFunction.java    | 11 ++++++++-
 .../FlinkExecutableStageFunctionTest.java          | 27 +++++++++++++++++++---
 .../fnexecution/control/SdkHarnessClient.java      |  5 ++--
 .../fnexecution/control/StageBundleFactory.java    | 10 ++++++++
 .../SparkExecutableStageFunctionTest.java          |  8 ++++---
 .../runners/portability/flink_runner_test.py       |  4 ++--
 6 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f28d526..5081951 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
@@ -101,6 +102,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
   private transient ExecutableStageContext stageContext;
   private transient StageBundleFactory stageBundleFactory;
   private transient BundleProgressHandler progressHandler;
+  private transient BundleFinalizationHandler finalizationHandler;
   // Only initialized when the ExecutableStage is stateful
   private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
   private transient ExecutableStage executableStage;
@@ -153,6 +155,12 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
             metricContainer.updateMetrics(stepName, response.getMonitoringInfosList());
           }
         };
+    // TODO(BEAM-11021): Support bundle finalization in portable batch.
+    finalizationHandler =
+        bundleId -> {
+          throw new UnsupportedOperationException(
+              "Portable Flink runner doesn't support bundle finalization in batch mode. For more details, please refer to https://issues.apache.org/jira/browse/BEAM-11021.");
+        };
   }
 
   private StateRequestHandler getStateRequestHandler(
@@ -199,7 +207,8 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
 
     ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
     try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) {
+        stageBundleFactory.getBundle(
+            receiverFactory, stateRequestHandler, progressHandler, finalizationHandler)) {
       processElements(iterable, bundle);
     }
   }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 3142e1c..20f6f64 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -115,7 +115,18 @@ public class FlinkExecutableStageFunctionTest {
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
     when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
     RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class);
-    when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle);
+    when(stageBundleFactory.getBundle(
+            any(),
+            any(StateRequestHandler.class),
+            any(BundleProgressHandler.class),
+            any(BundleFinalizationHandler.class)))
+        .thenReturn(remoteBundle);
+    when(stageBundleFactory.getBundle(
+            any(),
+            any(TimerReceiverFactory.class),
+            any(StateRequestHandler.class),
+            any(BundleProgressHandler.class)))
+        .thenReturn(remoteBundle);
     ImmutableMap input =
         ImmutableMap.builder().put("input", Mockito.mock(FnDataReceiver.class)).build();
     when(remoteBundle.getInputReceivers()).thenReturn(input);
@@ -129,7 +140,12 @@ public class FlinkExecutableStageFunctionTest {
 
     @SuppressWarnings("unchecked")
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
-    when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
+    when(stageBundleFactory.getBundle(
+            any(),
+            any(StateRequestHandler.class),
+            any(BundleProgressHandler.class),
+            any(BundleFinalizationHandler.class)))
+        .thenReturn(bundle);
 
     @SuppressWarnings("unchecked")
     FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
@@ -148,7 +164,12 @@ public class FlinkExecutableStageFunctionTest {
 
     @SuppressWarnings("unchecked")
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
-    when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
+    when(stageBundleFactory.getBundle(
+            any(),
+            any(StateRequestHandler.class),
+            any(BundleProgressHandler.class),
+            any(BundleFinalizationHandler.class)))
+        .thenReturn(bundle);
 
     @SuppressWarnings("unchecked")
     FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index f6a93d8..47c694f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -179,8 +179,9 @@ public class SdkHarnessClient implements AutoCloseable {
      * }
      * }</pre>
      *
-     * <p>An exception during {@link #close()} will be thrown if the bundle requests finalization or
-     * attempts to checkpoint by returning a {@link BeamFnApi.DelayedBundleApplication}.
+     * <p>An exception during {@link #close()} will be thrown if the bundle requests finalization if
+     * {@link BundleFinalizationHandler} is {@code null} or attempts to checkpoint by returning a
+     * {@link BeamFnApi.DelayedBundleApplication} .
      */
     public ActiveBundle newBundle(
         Map<String, RemoteOutputReceiver<?>> outputReceivers,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
index 459649b..0a31fc8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
@@ -37,6 +37,16 @@ public interface StageBundleFactory extends AutoCloseable {
     return getBundle(outputReceiverFactory, null, stateRequestHandler, progressHandler);
   }
 
+  default RemoteBundle getBundle(
+      OutputReceiverFactory outputReceiverFactory,
+      StateRequestHandler stateRequestHandler,
+      BundleProgressHandler progressHandler,
+      BundleFinalizationHandler finalizationHandler)
+      throws Exception {
+    return getBundle(
+        outputReceiverFactory, null, stateRequestHandler, progressHandler, finalizationHandler);
+  }
+
   /** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */
   default RemoteBundle getBundle(
       OutputReceiverFactory outputReceiverFactory,
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 6304616..ca23d3d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -93,7 +93,8 @@ public class SparkExecutableStageFunctionTest {
     MockitoAnnotations.initMocks(this);
     when(contextFactory.get(any())).thenReturn(stageContext);
     when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
-    when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(remoteBundle);
+    when(stageBundleFactory.getBundle(any(), any(), any(), any(BundleProgressHandler.class)))
+        .thenReturn(remoteBundle);
     @SuppressWarnings("unchecked")
     ImmutableMap<String, FnDataReceiver> inputReceiver =
         ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class));
@@ -116,7 +117,8 @@ public class SparkExecutableStageFunctionTest {
     SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
 
     RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
-    when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(bundle);
+    when(stageBundleFactory.getBundle(any(), any(), any(), any(BundleProgressHandler.class)))
+        .thenReturn(bundle);
 
     @SuppressWarnings("unchecked")
     FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
@@ -235,7 +237,7 @@ public class SparkExecutableStageFunctionTest {
     List<WindowedValue<Integer>> inputs = new ArrayList<>();
     inputs.add(WindowedValue.valueInGlobalWindow(0));
     function.call(inputs.iterator());
-    verify(stageBundleFactory).getBundle(any(), any(), any(), any());
+    verify(stageBundleFactory).getBundle(any(), any(), any(), any(BundleProgressHandler.class));
     verify(stageBundleFactory).getProcessBundleDescriptor();
     verify(stageBundleFactory).close();
     verifyNoMoreInteractions(stageBundleFactory);
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 1160691..a2beedd 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -393,10 +393,10 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
     raise unittest.SkipTest("BEAM-2939")
 
   def test_callbacks_with_exception(self):
-    raise unittest.SkipTest("BEAM-6868")
+    raise unittest.SkipTest("BEAM-11021")
 
   def test_register_finalizations(self):
-    raise unittest.SkipTest("BEAM-6868")
+    raise unittest.SkipTest("BEAM-11021")
 
   # Inherits all other tests.