You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/10/12 21:55:01 UTC
[beam] branch master updated: Add unsupported
BundleFinalizationHandler to portable batch Flink.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6b00b0b Add unsupported BundleFinalizationHandler to portable batch Flink.
new c0755cd Merge pull request #13054 from [BEAM-11021] Add unsupported BundleFinalizationHandler to portable batch Flink.
6b00b0b is described below
commit 6b00b0bcd83dac89bf178cca3657e3d95305c5b5
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Oct 8 16:41:58 2020 -0700
Add unsupported BundleFinalizationHandler to portable batch Flink.
---
.../functions/FlinkExecutableStageFunction.java | 11 ++++++++-
.../FlinkExecutableStageFunctionTest.java | 27 +++++++++++++++++++---
.../fnexecution/control/SdkHarnessClient.java | 5 ++--
.../fnexecution/control/StageBundleFactory.java | 10 ++++++++
.../SparkExecutableStageFunctionTest.java | 8 ++++---
.../runners/portability/flink_runner_test.py | 4 ++--
6 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f28d526..5081951 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
+import org.apache.beam.runners.fnexecution.control.BundleFinalizationHandler;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
@@ -101,6 +102,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
private transient ExecutableStageContext stageContext;
private transient StageBundleFactory stageBundleFactory;
private transient BundleProgressHandler progressHandler;
+ private transient BundleFinalizationHandler finalizationHandler;
// Only initialized when the ExecutableStage is stateful
private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
private transient ExecutableStage executableStage;
@@ -153,6 +155,12 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
metricContainer.updateMetrics(stepName, response.getMonitoringInfosList());
}
};
+ // TODO(BEAM-11021): Support bundle finalization in portable batch.
+ finalizationHandler =
+ bundleId -> {
+ throw new UnsupportedOperationException(
+ "Portable Flink runner doesn't support bundle finalization in batch mode. For more details, please refer to https://issues.apache.org/jira/browse/BEAM-11021.");
+ };
}
private StateRequestHandler getStateRequestHandler(
@@ -199,7 +207,8 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
try (RemoteBundle bundle =
- stageBundleFactory.getBundle(receiverFactory, stateRequestHandler, progressHandler)) {
+ stageBundleFactory.getBundle(
+ receiverFactory, stateRequestHandler, progressHandler, finalizationHandler)) {
processElements(iterable, bundle);
}
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 3142e1c..20f6f64 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -115,7 +115,18 @@ public class FlinkExecutableStageFunctionTest {
when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class);
- when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle);
+ when(stageBundleFactory.getBundle(
+ any(),
+ any(StateRequestHandler.class),
+ any(BundleProgressHandler.class),
+ any(BundleFinalizationHandler.class)))
+ .thenReturn(remoteBundle);
+ when(stageBundleFactory.getBundle(
+ any(),
+ any(TimerReceiverFactory.class),
+ any(StateRequestHandler.class),
+ any(BundleProgressHandler.class)))
+ .thenReturn(remoteBundle);
ImmutableMap input =
ImmutableMap.builder().put("input", Mockito.mock(FnDataReceiver.class)).build();
when(remoteBundle.getInputReceivers()).thenReturn(input);
@@ -129,7 +140,12 @@ public class FlinkExecutableStageFunctionTest {
@SuppressWarnings("unchecked")
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
- when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
+ when(stageBundleFactory.getBundle(
+ any(),
+ any(StateRequestHandler.class),
+ any(BundleProgressHandler.class),
+ any(BundleFinalizationHandler.class)))
+ .thenReturn(bundle);
@SuppressWarnings("unchecked")
FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
@@ -148,7 +164,12 @@ public class FlinkExecutableStageFunctionTest {
@SuppressWarnings("unchecked")
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
- when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
+ when(stageBundleFactory.getBundle(
+ any(),
+ any(StateRequestHandler.class),
+ any(BundleProgressHandler.class),
+ any(BundleFinalizationHandler.class)))
+ .thenReturn(bundle);
@SuppressWarnings("unchecked")
FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index f6a93d8..47c694f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -179,8 +179,9 @@ public class SdkHarnessClient implements AutoCloseable {
* }
* }</pre>
*
- * <p>An exception during {@link #close()} will be thrown if the bundle requests finalization or
- * attempts to checkpoint by returning a {@link BeamFnApi.DelayedBundleApplication}.
+ * <p>An exception during {@link #close()} will be thrown if the bundle requests finalization if
+ * {@link BundleFinalizationHandler} is {@code null} or attempts to checkpoint by returning a
+ * {@link BeamFnApi.DelayedBundleApplication} .
*/
public ActiveBundle newBundle(
Map<String, RemoteOutputReceiver<?>> outputReceivers,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
index 459649b..0a31fc8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
@@ -37,6 +37,16 @@ public interface StageBundleFactory extends AutoCloseable {
return getBundle(outputReceiverFactory, null, stateRequestHandler, progressHandler);
}
+ default RemoteBundle getBundle(
+ OutputReceiverFactory outputReceiverFactory,
+ StateRequestHandler stateRequestHandler,
+ BundleProgressHandler progressHandler,
+ BundleFinalizationHandler finalizationHandler)
+ throws Exception {
+ return getBundle(
+ outputReceiverFactory, null, stateRequestHandler, progressHandler, finalizationHandler);
+ }
+
/** Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. */
default RemoteBundle getBundle(
OutputReceiverFactory outputReceiverFactory,
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
index 6304616..ca23d3d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunctionTest.java
@@ -93,7 +93,8 @@ public class SparkExecutableStageFunctionTest {
MockitoAnnotations.initMocks(this);
when(contextFactory.get(any())).thenReturn(stageContext);
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
- when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(remoteBundle);
+ when(stageBundleFactory.getBundle(any(), any(), any(), any(BundleProgressHandler.class)))
+ .thenReturn(remoteBundle);
@SuppressWarnings("unchecked")
ImmutableMap<String, FnDataReceiver> inputReceiver =
ImmutableMap.of("input", Mockito.mock(FnDataReceiver.class));
@@ -116,7 +117,8 @@ public class SparkExecutableStageFunctionTest {
SparkExecutableStageFunction<Integer, ?> function = getFunction(Collections.emptyMap());
RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
- when(stageBundleFactory.getBundle(any(), any(), any(), any())).thenReturn(bundle);
+ when(stageBundleFactory.getBundle(any(), any(), any(), any(BundleProgressHandler.class)))
+ .thenReturn(bundle);
@SuppressWarnings("unchecked")
FnDataReceiver<WindowedValue<?>> receiver = Mockito.mock(FnDataReceiver.class);
@@ -235,7 +237,7 @@ public class SparkExecutableStageFunctionTest {
List<WindowedValue<Integer>> inputs = new ArrayList<>();
inputs.add(WindowedValue.valueInGlobalWindow(0));
function.call(inputs.iterator());
- verify(stageBundleFactory).getBundle(any(), any(), any(), any());
+ verify(stageBundleFactory).getBundle(any(), any(), any(), any(BundleProgressHandler.class));
verify(stageBundleFactory).getProcessBundleDescriptor();
verify(stageBundleFactory).close();
verifyNoMoreInteractions(stageBundleFactory);
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 1160691..a2beedd 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -393,10 +393,10 @@ class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
raise unittest.SkipTest("BEAM-2939")
def test_callbacks_with_exception(self):
- raise unittest.SkipTest("BEAM-6868")
+ raise unittest.SkipTest("BEAM-11021")
def test_register_finalizations(self):
- raise unittest.SkipTest("BEAM-6868")
+ raise unittest.SkipTest("BEAM-11021")
# Inherits all other tests.