You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/08/19 22:22:05 UTC
[beam] branch master updated: [BEAM-9979] Support a reset call that
happens after the bundle is done allowing resetting of internal state of a
transform runner.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ca2e7e7 [BEAM-9979] Support a reset call that happens after the bundle is done allowing resetting of internal state of a transform runner.
new 1c6b07c Merge pull request #12636 from lukecwik/beam9979
ca2e7e7 is described below
commit ca2e7e79e1f2631da44bbb62cedc745b3727a990
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Aug 19 14:41:05 2020 -0700
[BEAM-9979] Support a reset call that happens after the bundle is done allowing resetting of internal state of a transform runner.
This is preliminary work needed to be able to reset the BeamFnDataReadRunner to solve the race condition.
---
.../beam/fn/harness/BeamFnDataReadRunner.java | 1 +
.../beam/fn/harness/BeamFnDataWriteRunner.java | 1 +
.../beam/fn/harness/BoundedSourceRunner.java | 1 +
.../org/apache/beam/fn/harness/CombineRunners.java | 1 +
.../org/apache/beam/fn/harness/FlattenRunner.java | 1 +
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 1 +
.../org/apache/beam/fn/harness/MapFnRunners.java | 1 +
.../beam/fn/harness/PTransformRunnerFactory.java | 20 ++++++++--
.../fn/harness/control/ProcessBundleHandler.java | 27 +++++++++++--
.../beam/fn/harness/AssignWindowsRunnerTest.java | 5 ++-
.../beam/fn/harness/BeamFnDataReadRunnerTest.java | 4 ++
.../beam/fn/harness/BeamFnDataWriteRunnerTest.java | 1 +
.../beam/fn/harness/BoundedSourceRunnerTest.java | 1 +
.../apache/beam/fn/harness/CombineRunnersTest.java | 15 ++++---
.../apache/beam/fn/harness/FlattenRunnerTest.java | 10 +++--
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 19 +++++++++
.../apache/beam/fn/harness/MapFnRunnersTest.java | 3 ++
.../harness/control/ProcessBundleHandlerTest.java | 46 ++++++++++++++++++++--
18 files changed, 136 insertions(+), 22 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 34cdcb4..fe52877 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -102,6 +102,7 @@ public class BeamFnDataReadRunner<OutputT> {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 684f399..0de2d3f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -92,6 +92,7 @@ public class BeamFnDataWriteRunner<InputT> {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 9500d21..ee63a6f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -85,6 +85,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 9cf80cb..e32ff38 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -131,6 +131,7 @@ public class CombineRunners {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
index 405d98b..18d398f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
@@ -70,6 +70,7 @@ public class FlattenRunner<InputT> {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 4bb6765..de479d5 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -169,6 +169,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
index aa3fe49..9321326 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
@@ -112,6 +112,7 @@ public abstract class MapFnRunners {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index ce334c7..1ad006f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -42,7 +42,7 @@ public interface PTransformRunnerFactory<T> {
/**
* Creates and returns a handler for a given PTransform. Note that the handler must support
* processing multiple bundles. The handler will be discarded if an error is thrown during element
- * processing, or during execution of start/finish.
+ * processing, or during execution of start/finish/reset.
*
* @param pipelineOptions Pipeline options
* @param beamFnDataClient A client for handling inbound and outbound data streams.
@@ -62,10 +62,21 @@ public interface PTransformRunnerFactory<T> {
* registered within this multimap.
* @param startFunctionRegistry A class to register a start bundle handler with.
* @param finishFunctionRegistry A class to register a finish bundle handler with.
- * @param addTearDownFunction A consumer to register a tear down handler with.
+ * @param addResetFunction A consumer to register any reset methods. This should not invoke any
+ * user code which should be done instead using the {@code finishFunctionRegistry}. The reset
+ * method is guaranteed to be invoked after the bundle completes successfully and after {@code
+ * T} becomes ineligible to receive method calls registered with {@code
+ * addProgressRequestCallback} or {@code splitListener}.
+ * @param addTearDownFunction A consumer to register a tear down handler with. This method will be
+ * invoked before {@code T} is eligible to become garbage collected.
* @param addProgressRequestCallback A consumer to register a callback whenever progress is being
- * requested.
- * @param splitListener A listener to be invoked when the PTransform splits itself.
+ * requested. This method will be called concurrently to any methods registered with {@code
+ * pCollectionConsumerRegistry}, {@code startFunctionRegistry}, and {@code
+ * finishFunctionRegistry}.
+ * @param splitListener A listener to be invoked when the PTransform splits itself. This method
+ * will be called concurrently to any methods registered with {@code
+ * pCollectionConsumerRegistry}, {@code startFunctionRegistry}, and {@code
+ * finishFunctionRegistry}.
* @param bundleFinalizer Register callbacks that will be invoked when the runner completes the
* bundle. The specified instant provides the timeout on how long the finalization callback is
* valid for.
@@ -84,6 +95,7 @@ public interface PTransformRunnerFactory<T> {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 40105ba..a664ea2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -183,6 +183,7 @@ public class ProcessBundleHandler {
Set<String> processedPTransformIds,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
@@ -209,6 +210,7 @@ public class ProcessBundleHandler {
processedPTransformIds,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -248,6 +250,7 @@ public class ProcessBundleHandler {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -428,6 +431,7 @@ public class ProcessBundleHandler {
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
metricsContainerRegistry, stateTracker, ExecutionStateTracker.FINISH_STATE_NAME);
+ List<ThrowingRunnable> resetFunctions = new ArrayList<>();
List<ThrowingRunnable> tearDownFunctions = new ArrayList<>();
List<ProgressRequestCallback> progressRequestCallbacks = new ArrayList<>();
@@ -472,6 +476,7 @@ public class ProcessBundleHandler {
BundleProcessor.create(
startFunctionRegistry,
finishFunctionRegistry,
+ resetFunctions,
tearDownFunctions,
progressRequestCallbacks,
splitListener,
@@ -510,6 +515,7 @@ public class ProcessBundleHandler {
processedPTransformIds,
startFunctionRegistry,
finishFunctionRegistry,
+ resetFunctions::add,
tearDownFunctions::add,
progressRequestCallbacks::add,
splitListener,
@@ -580,8 +586,15 @@ public class ProcessBundleHandler {
*/
void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
activeBundleProcessors.remove(bundleProcessor.getInstructionId());
- bundleProcessor.reset();
- cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
+ try {
+ bundleProcessor.reset();
+ cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
+ } catch (Exception e) {
+ LOG.warn(
+ "Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.",
+ bundleDescriptorId,
+ e);
+ }
}
/** Shutdown all the cached {@link BundleProcessor}s, running the tearDown() functions. */
@@ -605,6 +618,7 @@ public class ProcessBundleHandler {
public static BundleProcessor create(
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ List<ThrowingRunnable> resetFunctions,
List<ThrowingRunnable> tearDownFunctions,
List<ProgressRequestCallback> progressRequestCallbacks,
BundleSplitListener.InMemory splitListener,
@@ -617,6 +631,7 @@ public class ProcessBundleHandler {
return new AutoValue_ProcessBundleHandler_BundleProcessor(
startFunctionRegistry,
finishFunctionRegistry,
+ resetFunctions,
tearDownFunctions,
progressRequestCallbacks,
splitListener,
@@ -635,6 +650,8 @@ public class ProcessBundleHandler {
abstract PTransformFunctionRegistry getFinishFunctionRegistry();
+ abstract List<ThrowingRunnable> getResetFunctions();
+
abstract List<ThrowingRunnable> getTearDownFunctions();
abstract List<ProgressRequestCallback> getProgressRequestCallbacks();
@@ -663,7 +680,7 @@ public class ProcessBundleHandler {
this.instructionId = instructionId;
}
- void reset() {
+ void reset() throws Exception {
getStartFunctionRegistry().reset();
getFinishFunctionRegistry().reset();
getSplitListener().clear();
@@ -672,6 +689,9 @@ public class ProcessBundleHandler {
getStateTracker().reset();
ExecutionStateSampler.instance().reset();
getBundleFinalizationCallbackRegistrations().clear();
+ for (ThrowingRunnable resetFunction : getResetFunctions()) {
+ resetFunction.run();
+ }
}
}
@@ -789,6 +809,7 @@ public class ProcessBundleHandler {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> tearDownFunctions,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
index 17ca2a6..ae6c432 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
@@ -205,8 +205,9 @@ public class AssignWindowsRunnerTest implements Serializable {
null /* windowingStrategies */,
pCollectionConsumerRegistry,
null /* startFunctionRegistry */,
- null, /* finishFunctionRegistry */
- null, /* tearDownRegistry */
+ null /* finishFunctionRegistry */,
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index c1bfa46..779fd0e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -163,6 +163,7 @@ public class BeamFnDataReadRunnerTest {
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
+ List<ThrowingRunnable> resetFunctions = new ArrayList<>();
List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
RunnerApi.PTransform pTransform =
@@ -185,6 +186,7 @@ public class BeamFnDataReadRunnerTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ resetFunctions::add,
teardownFunctions::add,
(PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
null /* splitListener */,
@@ -696,6 +698,7 @@ public class BeamFnDataReadRunnerTest {
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
+ List<ThrowingRunnable> resetFunctions = new ArrayList<>();
List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
RunnerApi.PTransform pTransform =
@@ -718,6 +721,7 @@ public class BeamFnDataReadRunnerTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ resetFunctions::add,
teardownFunctions::add,
(PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index a2f4c8f..c59e97d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -149,6 +149,7 @@ public class BeamFnDataWriteRunnerTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index 77b592a..99528f5 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -173,6 +173,7 @@ public class BoundedSourceRunnerTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
index c16dffb..90457fa 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
@@ -157,7 +157,8 @@ public class CombineRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
- null, /* tearDownRegistry */
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
@@ -235,7 +236,8 @@ public class CombineRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
- null, /* tearDownRegistry */
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
@@ -301,7 +303,8 @@ public class CombineRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
- null, /* tearDownRegistry */
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
@@ -367,7 +370,8 @@ public class CombineRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
- null, /* tearDownRegistry */
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
@@ -432,7 +436,8 @@ public class CombineRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
- null, /* tearDownRegistry */
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
index e57ad4e..f9f093d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
@@ -91,8 +91,9 @@ public class FlattenRunnerTest {
Collections.emptyMap(),
consumers,
null /* startFunctionRegistry */,
- null, /* finishFunctionRegistry */
- null, /* tearDownRegistry */
+ null /* finishFunctionRegistry */,
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
@@ -162,8 +163,9 @@ public class FlattenRunnerTest {
Collections.emptyMap(),
consumers,
null /* startFunctionRegistry */,
- null, /* finishFunctionRegistry */
- null, /* tearDownRegistry */
+ null /* finishFunctionRegistry */,
+ null /* addResetFunction */,
+ null /* tearDownRegistry */,
null /* addProgressRequestCallback */,
null /* splitListener */,
null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 63e784a..8e36ed9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -269,6 +269,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -450,6 +451,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -575,6 +577,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -727,6 +730,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -843,6 +847,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -970,6 +975,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -1590,6 +1596,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
progressRequestCallbacks::add,
splitListener,
@@ -1900,6 +1907,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
progressRequestCallbacks::add,
splitListener,
@@ -2252,6 +2260,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -2347,6 +2356,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -2460,6 +2470,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -2565,6 +2576,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -2677,6 +2689,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -2834,6 +2847,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -3012,6 +3026,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -3087,6 +3102,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -3165,6 +3181,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -3275,6 +3292,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
@@ -3408,6 +3426,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* bundleSplitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
index bec72be..559a71c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
@@ -93,6 +93,7 @@ public class MapFnRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -141,6 +142,7 @@ public class MapFnRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
@@ -188,6 +190,7 @@ public class MapFnRunnersTest {
consumers,
startFunctionRegistry,
finishFunctionRegistry,
+ null /* addResetFunction */,
teardownFunctions::add,
null /* addProgressRequestCallback */,
null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 257f5e7..4ff83d1 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -123,6 +123,7 @@ public class ProcessBundleHandlerTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ TestBundleProcessor.resetCnt = 0;
}
private static class TestDoFn extends DoFn<String, String> {
@@ -193,6 +194,11 @@ public class ProcessBundleHandlerTest {
}
@Override
+ List<ThrowingRunnable> getResetFunctions() {
+ return wrappedBundleProcessor.getResetFunctions();
+ }
+
+ @Override
List<ThrowingRunnable> getTearDownFunctions() {
return wrappedBundleProcessor.getTearDownFunctions();
}
@@ -243,7 +249,7 @@ public class ProcessBundleHandlerTest {
}
@Override
- void reset() {
+ void reset() throws Exception {
resetCnt++;
wrappedBundleProcessor.reset();
}
@@ -356,6 +362,7 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -489,8 +496,9 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
- addProgressRequestCallback,
+ addResetFunction,
addTearDownFunction,
+ addProgressRequestCallback,
splitListener,
bundleFinalizer) -> null);
@@ -562,8 +570,9 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
- addProgressRequestCallback,
+ addResetFunction,
addTearDownFunction,
+ addProgressRequestCallback,
splitListener,
bundleFinalizer) -> null),
new TestBundleProcessorCache());
@@ -584,6 +593,25 @@ public class ProcessBundleHandlerTest {
assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().size(), equalTo(1));
assertThat(
handler.bundleProcessorCache.getCachedBundleProcessors().get("1L").size(), equalTo(1));
+
+ // Add a reset handler that throws to test discarding the bundle processor on reset failure.
+ Iterables.getOnlyElement(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"))
+ .getResetFunctions()
+ .add(
+ () -> {
+ throw new IllegalStateException("ResetFailed");
+ });
+
+ handler.processBundle(
+ BeamFnApi.InstructionRequest.newBuilder()
+ .setInstructionId("999L")
+ .setProcessBundle(
+ BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+ .build());
+
+ // BundleProcessor is discarded instead of being added back to the BundleProcessorCache
+ assertThat(
+ handler.bundleProcessorCache.getCachedBundleProcessors().get("1L").size(), equalTo(0));
}
@Test
@@ -605,7 +633,7 @@ public class ProcessBundleHandlerTest {
}
@Test
- public void testBundleProcessorReset() {
+ public void testBundleProcessorReset() throws Exception {
PTransformFunctionRegistry startFunctionRegistry = mock(PTransformFunctionRegistry.class);
PTransformFunctionRegistry finishFunctionRegistry = mock(PTransformFunctionRegistry.class);
BundleSplitListener.InMemory splitListener = mock(BundleSplitListener.InMemory.class);
@@ -617,10 +645,12 @@ public class ProcessBundleHandlerTest {
ProcessBundleHandler.HandleStateCallsForBundle beamFnStateClient =
mock(ProcessBundleHandler.HandleStateCallsForBundle.class);
QueueingBeamFnDataClient queueingClient = mock(QueueingBeamFnDataClient.class);
+ ThrowingRunnable resetFunction = mock(ThrowingRunnable.class);
BundleProcessor bundleProcessor =
BundleProcessor.create(
startFunctionRegistry,
finishFunctionRegistry,
+ Collections.singletonList(resetFunction),
new ArrayList<>(),
new ArrayList<>(),
splitListener,
@@ -639,6 +669,7 @@ public class ProcessBundleHandlerTest {
verify(metricsContainerRegistry, times(1)).reset();
verify(stateTracker, times(1)).reset();
verify(bundleFinalizationCallbacks, times(1)).clear();
+ verify(resetFunction, times(1)).run();
}
@Test
@@ -675,6 +706,7 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -728,6 +760,7 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -796,6 +829,7 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -854,6 +888,7 @@ public class ProcessBundleHandlerTest {
pCollectionConsumerRegistry,
startFunctionRegistry,
finishFunctionRegistry,
+ addResetFunction,
addTearDownFunction,
addProgressRequestCallback,
splitListener,
@@ -950,6 +985,7 @@ public class ProcessBundleHandlerTest {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
@@ -1015,6 +1051,7 @@ public class ProcessBundleHandlerTest {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,
@@ -1078,6 +1115,7 @@ public class ProcessBundleHandlerTest {
PCollectionConsumerRegistry pCollectionConsumerRegistry,
PTransformFunctionRegistry startFunctionRegistry,
PTransformFunctionRegistry finishFunctionRegistry,
+ Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Consumer<ProgressRequestCallback> addProgressRequestCallback,
BundleSplitListener splitListener,