You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/08/19 22:22:05 UTC

[beam] branch master updated: [BEAM-9979] Support a reset call that happens after the bundle is done allowing resetting of internal state of a transform runner.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2e7e7  [BEAM-9979] Support a reset call that happens after the bundle is done allowing resetting of internal state of a transform runner.
     new 1c6b07c  Merge pull request #12636 from lukecwik/beam9979
ca2e7e7 is described below

commit ca2e7e79e1f2631da44bbb62cedc745b3727a990
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Aug 19 14:41:05 2020 -0700

    [BEAM-9979] Support a reset call that happens after the bundle is done allowing resetting of internal state of a transform runner.
    
    This is preliminary work needed to be able to reset the BeamFnDataReadRunner to solve the race condition.
---
 .../beam/fn/harness/BeamFnDataReadRunner.java      |  1 +
 .../beam/fn/harness/BeamFnDataWriteRunner.java     |  1 +
 .../beam/fn/harness/BoundedSourceRunner.java       |  1 +
 .../org/apache/beam/fn/harness/CombineRunners.java |  1 +
 .../org/apache/beam/fn/harness/FlattenRunner.java  |  1 +
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  1 +
 .../org/apache/beam/fn/harness/MapFnRunners.java   |  1 +
 .../beam/fn/harness/PTransformRunnerFactory.java   | 20 ++++++++--
 .../fn/harness/control/ProcessBundleHandler.java   | 27 +++++++++++--
 .../beam/fn/harness/AssignWindowsRunnerTest.java   |  5 ++-
 .../beam/fn/harness/BeamFnDataReadRunnerTest.java  |  4 ++
 .../beam/fn/harness/BeamFnDataWriteRunnerTest.java |  1 +
 .../beam/fn/harness/BoundedSourceRunnerTest.java   |  1 +
 .../apache/beam/fn/harness/CombineRunnersTest.java | 15 ++++---
 .../apache/beam/fn/harness/FlattenRunnerTest.java  | 10 +++--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       | 19 +++++++++
 .../apache/beam/fn/harness/MapFnRunnersTest.java   |  3 ++
 .../harness/control/ProcessBundleHandlerTest.java  | 46 ++++++++++++++++++++--
 18 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index 34cdcb4..fe52877 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -102,6 +102,7 @@ public class BeamFnDataReadRunner<OutputT> {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 684f399..0de2d3f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -92,6 +92,7 @@ public class BeamFnDataWriteRunner<InputT> {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 9500d21..ee63a6f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -85,6 +85,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
index 9cf80cb..e32ff38 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java
@@ -131,6 +131,7 @@ public class CombineRunners {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
index 405d98b..18d398f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java
@@ -70,6 +70,7 @@ public class FlattenRunner<InputT> {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 4bb6765..de479d5 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -169,6 +169,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             PCollectionConsumerRegistry pCollectionConsumerRegistry,
             PTransformFunctionRegistry startFunctionRegistry,
             PTransformFunctionRegistry finishFunctionRegistry,
+            Consumer<ThrowingRunnable> addResetFunction,
             Consumer<ThrowingRunnable> tearDownFunctions,
             Consumer<ProgressRequestCallback> addProgressRequestCallback,
             BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
index aa3fe49..9321326 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java
@@ -112,6 +112,7 @@ public abstract class MapFnRunners {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index ce334c7..1ad006f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -42,7 +42,7 @@ public interface PTransformRunnerFactory<T> {
   /**
    * Creates and returns a handler for a given PTransform. Note that the handler must support
    * processing multiple bundles. The handler will be discarded if an error is thrown during element
-   * processing, or during execution of start/finish.
+   * processing, or during execution of start/finish/reset.
    *
    * @param pipelineOptions Pipeline options
    * @param beamFnDataClient A client for handling inbound and outbound data streams.
@@ -62,10 +62,21 @@ public interface PTransformRunnerFactory<T> {
    *     registered within this multimap.
    * @param startFunctionRegistry A class to register a start bundle handler with.
    * @param finishFunctionRegistry A class to register a finish bundle handler with.
-   * @param addTearDownFunction A consumer to register a tear down handler with.
+   * @param addResetFunction A consumer to register any reset methods. This should not invoke any
+   *     user code which should be done instead using the {@code finishFunctionRegistry}. The reset
+   *     method is guaranteed to be invoked after the bundle completes successfully and after {@code
+   *     T} becomes ineligible to receive method calls registered with {@code
+   *     addProgressRequestCallback} or {@code splitListener}.
+   * @param addTearDownFunction A consumer to register a tear down handler with. This method will be
+   *     invoked before {@code T} is eligible to become garbage collected.
    * @param addProgressRequestCallback A consumer to register a callback whenever progress is being
-   *     requested.
-   * @param splitListener A listener to be invoked when the PTransform splits itself.
+   *     requested. This method will be called concurrently to any methods registered with {@code
+   *     pCollectionConsumerRegistry}, {@code startFunctionRegistry}, and {@code
+   *     finishFunctionRegistry}.
+   * @param splitListener A listener to be invoked when the PTransform splits itself. This method
+   *     will be called concurrently to any methods registered with {@code
+   *     pCollectionConsumerRegistry}, {@code startFunctionRegistry}, and {@code
+   *     finishFunctionRegistry}.
    * @param bundleFinalizer Register callbacks that will be invoked when the runner completes the
    *     bundle. The specified instant provides the timeout on how long the finalization callback is
    *     valid for.
@@ -84,6 +95,7 @@ public interface PTransformRunnerFactory<T> {
       PCollectionConsumerRegistry pCollectionConsumerRegistry,
       PTransformFunctionRegistry startFunctionRegistry,
       PTransformFunctionRegistry finishFunctionRegistry,
+      Consumer<ThrowingRunnable> addResetFunction,
       Consumer<ThrowingRunnable> addTearDownFunction,
       Consumer<ProgressRequestCallback> addProgressRequestCallback,
       BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 40105ba..a664ea2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -183,6 +183,7 @@ public class ProcessBundleHandler {
       Set<String> processedPTransformIds,
       PTransformFunctionRegistry startFunctionRegistry,
       PTransformFunctionRegistry finishFunctionRegistry,
+      Consumer<ThrowingRunnable> addResetFunction,
       Consumer<ThrowingRunnable> addTearDownFunction,
       Consumer<ProgressRequestCallback> addProgressRequestCallback,
       BundleSplitListener splitListener,
@@ -209,6 +210,7 @@ public class ProcessBundleHandler {
             processedPTransformIds,
             startFunctionRegistry,
             finishFunctionRegistry,
+            addResetFunction,
             addTearDownFunction,
             addProgressRequestCallback,
             splitListener,
@@ -248,6 +250,7 @@ public class ProcessBundleHandler {
                   pCollectionConsumerRegistry,
                   startFunctionRegistry,
                   finishFunctionRegistry,
+                  addResetFunction,
                   addTearDownFunction,
                   addProgressRequestCallback,
                   splitListener,
@@ -428,6 +431,7 @@ public class ProcessBundleHandler {
     PTransformFunctionRegistry finishFunctionRegistry =
         new PTransformFunctionRegistry(
             metricsContainerRegistry, stateTracker, ExecutionStateTracker.FINISH_STATE_NAME);
+    List<ThrowingRunnable> resetFunctions = new ArrayList<>();
     List<ThrowingRunnable> tearDownFunctions = new ArrayList<>();
     List<ProgressRequestCallback> progressRequestCallbacks = new ArrayList<>();
 
@@ -472,6 +476,7 @@ public class ProcessBundleHandler {
         BundleProcessor.create(
             startFunctionRegistry,
             finishFunctionRegistry,
+            resetFunctions,
             tearDownFunctions,
             progressRequestCallbacks,
             splitListener,
@@ -510,6 +515,7 @@ public class ProcessBundleHandler {
           processedPTransformIds,
           startFunctionRegistry,
           finishFunctionRegistry,
+          resetFunctions::add,
           tearDownFunctions::add,
           progressRequestCallbacks::add,
           splitListener,
@@ -580,8 +586,15 @@ public class ProcessBundleHandler {
      */
     void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
       activeBundleProcessors.remove(bundleProcessor.getInstructionId());
-      bundleProcessor.reset();
-      cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
+      try {
+        bundleProcessor.reset();
+        cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
+      } catch (Exception e) {
+        LOG.warn(
+            "Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.",
+            bundleDescriptorId,
+            e);
+      }
     }
 
     /** Shutdown all the cached {@link BundleProcessor}s, running the tearDown() functions. */
@@ -605,6 +618,7 @@ public class ProcessBundleHandler {
     public static BundleProcessor create(
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        List<ThrowingRunnable> resetFunctions,
         List<ThrowingRunnable> tearDownFunctions,
         List<ProgressRequestCallback> progressRequestCallbacks,
         BundleSplitListener.InMemory splitListener,
@@ -617,6 +631,7 @@ public class ProcessBundleHandler {
       return new AutoValue_ProcessBundleHandler_BundleProcessor(
           startFunctionRegistry,
           finishFunctionRegistry,
+          resetFunctions,
           tearDownFunctions,
           progressRequestCallbacks,
           splitListener,
@@ -635,6 +650,8 @@ public class ProcessBundleHandler {
 
     abstract PTransformFunctionRegistry getFinishFunctionRegistry();
 
+    abstract List<ThrowingRunnable> getResetFunctions();
+
     abstract List<ThrowingRunnable> getTearDownFunctions();
 
     abstract List<ProgressRequestCallback> getProgressRequestCallbacks();
@@ -663,7 +680,7 @@ public class ProcessBundleHandler {
       this.instructionId = instructionId;
     }
 
-    void reset() {
+    void reset() throws Exception {
       getStartFunctionRegistry().reset();
       getFinishFunctionRegistry().reset();
       getSplitListener().clear();
@@ -672,6 +689,9 @@ public class ProcessBundleHandler {
       getStateTracker().reset();
       ExecutionStateSampler.instance().reset();
       getBundleFinalizationCallbackRegistrations().clear();
+      for (ThrowingRunnable resetFunction : getResetFunctions()) {
+        resetFunction.run();
+      }
     }
   }
 
@@ -789,6 +809,7 @@ public class ProcessBundleHandler {
         PCollectionConsumerRegistry pCollectionConsumerRegistry,
         PTransformFunctionRegistry startFunctionRegistry,
         PTransformFunctionRegistry finishFunctionRegistry,
+        Consumer<ThrowingRunnable> addResetFunction,
         Consumer<ThrowingRunnable> tearDownFunctions,
         Consumer<ProgressRequestCallback> addProgressRequestCallback,
         BundleSplitListener splitListener,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
index 17ca2a6..ae6c432 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java
@@ -205,8 +205,9 @@ public class AssignWindowsRunnerTest implements Serializable {
             null /* windowingStrategies */,
             pCollectionConsumerRegistry,
             null /* startFunctionRegistry */,
-            null, /* finishFunctionRegistry */
-            null, /* tearDownRegistry */
+            null /* finishFunctionRegistry */,
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index c1bfa46..779fd0e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -163,6 +163,7 @@ public class BeamFnDataReadRunnerTest {
       PTransformFunctionRegistry finishFunctionRegistry =
           new PTransformFunctionRegistry(
               mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
+      List<ThrowingRunnable> resetFunctions = new ArrayList<>();
       List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
 
       RunnerApi.PTransform pTransform =
@@ -185,6 +186,7 @@ public class BeamFnDataReadRunnerTest {
               consumers,
               startFunctionRegistry,
               finishFunctionRegistry,
+              resetFunctions::add,
               teardownFunctions::add,
               (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
               null /* splitListener */,
@@ -696,6 +698,7 @@ public class BeamFnDataReadRunnerTest {
     PTransformFunctionRegistry finishFunctionRegistry =
         new PTransformFunctionRegistry(
             mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
+    List<ThrowingRunnable> resetFunctions = new ArrayList<>();
     List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
 
     RunnerApi.PTransform pTransform =
@@ -718,6 +721,7 @@ public class BeamFnDataReadRunnerTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            resetFunctions::add,
             teardownFunctions::add,
             (PTransformRunnerFactory.ProgressRequestCallback callback) -> {},
             null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index a2f4c8f..c59e97d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -149,6 +149,7 @@ public class BeamFnDataWriteRunnerTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index 77b592a..99528f5 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -173,6 +173,7 @@ public class BoundedSourceRunnerTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
index c16dffb..90457fa 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java
@@ -157,7 +157,8 @@ public class CombineRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
-            null, /* tearDownRegistry */
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
@@ -235,7 +236,8 @@ public class CombineRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
-            null, /* tearDownRegistry */
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
@@ -301,7 +303,8 @@ public class CombineRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
-            null, /* tearDownRegistry */
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
@@ -367,7 +370,8 @@ public class CombineRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
-            null, /* tearDownRegistry */
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
@@ -432,7 +436,8 @@ public class CombineRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
-            null, /* tearDownRegistry */
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
index e57ad4e..f9f093d 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java
@@ -91,8 +91,9 @@ public class FlattenRunnerTest {
             Collections.emptyMap(),
             consumers,
             null /* startFunctionRegistry */,
-            null, /* finishFunctionRegistry */
-            null, /* tearDownRegistry */
+            null /* finishFunctionRegistry */,
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
@@ -162,8 +163,9 @@ public class FlattenRunnerTest {
             Collections.emptyMap(),
             consumers,
             null /* startFunctionRegistry */,
-            null, /* finishFunctionRegistry */
-            null, /* tearDownRegistry */
+            null /* finishFunctionRegistry */,
+            null /* addResetFunction */,
+            null /* tearDownRegistry */,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
             null /* bundleFinalizer */);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 63e784a..8e36ed9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -269,6 +269,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -450,6 +451,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -575,6 +577,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -727,6 +730,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -843,6 +847,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -970,6 +975,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -1590,6 +1596,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             progressRequestCallbacks::add,
             splitListener,
@@ -1900,6 +1907,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             progressRequestCallbacks::add,
             splitListener,
@@ -2252,6 +2260,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -2347,6 +2356,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -2460,6 +2470,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -2565,6 +2576,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -2677,6 +2689,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -2834,6 +2847,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -3012,6 +3026,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -3087,6 +3102,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -3165,6 +3181,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -3275,6 +3292,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
@@ -3408,6 +3426,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* bundleSplitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
index bec72be..559a71c 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java
@@ -93,6 +93,7 @@ public class MapFnRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -141,6 +142,7 @@ public class MapFnRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
@@ -188,6 +190,7 @@ public class MapFnRunnersTest {
             consumers,
             startFunctionRegistry,
             finishFunctionRegistry,
+            null /* addResetFunction */,
             teardownFunctions::add,
             null /* addProgressRequestCallback */,
             null /* splitListener */,
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 257f5e7..4ff83d1 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -123,6 +123,7 @@ public class ProcessBundleHandlerTest {
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
+    TestBundleProcessor.resetCnt = 0;
   }
 
   private static class TestDoFn extends DoFn<String, String> {
@@ -193,6 +194,11 @@ public class ProcessBundleHandlerTest {
     }
 
     @Override
+    List<ThrowingRunnable> getResetFunctions() {
+      return wrappedBundleProcessor.getResetFunctions();
+    }
+
+    @Override
     List<ThrowingRunnable> getTearDownFunctions() {
       return wrappedBundleProcessor.getTearDownFunctions();
     }
@@ -243,7 +249,7 @@ public class ProcessBundleHandlerTest {
     }
 
     @Override
-    void reset() {
+    void reset() throws Exception {
       resetCnt++;
       wrappedBundleProcessor.reset();
     }
@@ -356,6 +362,7 @@ public class ProcessBundleHandlerTest {
             pCollectionConsumerRegistry,
             startFunctionRegistry,
             finishFunctionRegistry,
+            addResetFunction,
             addTearDownFunction,
             addProgressRequestCallback,
             splitListener,
@@ -489,8 +496,9 @@ public class ProcessBundleHandlerTest {
             pCollectionConsumerRegistry,
             startFunctionRegistry,
             finishFunctionRegistry,
-            addProgressRequestCallback,
+            addResetFunction,
             addTearDownFunction,
+            addProgressRequestCallback,
             splitListener,
             bundleFinalizer) -> null);
 
@@ -562,8 +570,9 @@ public class ProcessBundleHandlerTest {
                     pCollectionConsumerRegistry,
                     startFunctionRegistry,
                     finishFunctionRegistry,
-                    addProgressRequestCallback,
+                    addResetFunction,
                     addTearDownFunction,
+                    addProgressRequestCallback,
                     splitListener,
                     bundleFinalizer) -> null),
             new TestBundleProcessorCache());
@@ -584,6 +593,25 @@ public class ProcessBundleHandlerTest {
     assertThat(handler.bundleProcessorCache.getCachedBundleProcessors().size(), equalTo(1));
     assertThat(
         handler.bundleProcessorCache.getCachedBundleProcessors().get("1L").size(), equalTo(1));
+
+    // Add a reset handler that throws to test discarding the bundle processor on reset failure.
+    Iterables.getOnlyElement(handler.bundleProcessorCache.getCachedBundleProcessors().get("1L"))
+        .getResetFunctions()
+        .add(
+            () -> {
+              throw new IllegalStateException("ResetFailed");
+            });
+
+    handler.processBundle(
+        BeamFnApi.InstructionRequest.newBuilder()
+            .setInstructionId("999L")
+            .setProcessBundle(
+                BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("1L"))
+            .build());
+
+    // BundleProcessor is discarded instead of being added back to the BundleProcessorCache
+    assertThat(
+        handler.bundleProcessorCache.getCachedBundleProcessors().get("1L").size(), equalTo(0));
   }
 
   @Test
@@ -605,7 +633,7 @@ public class ProcessBundleHandlerTest {
   }
 
   @Test
-  public void testBundleProcessorReset() {
+  public void testBundleProcessorReset() throws Exception {
     PTransformFunctionRegistry startFunctionRegistry = mock(PTransformFunctionRegistry.class);
     PTransformFunctionRegistry finishFunctionRegistry = mock(PTransformFunctionRegistry.class);
     BundleSplitListener.InMemory splitListener = mock(BundleSplitListener.InMemory.class);
@@ -617,10 +645,12 @@ public class ProcessBundleHandlerTest {
     ProcessBundleHandler.HandleStateCallsForBundle beamFnStateClient =
         mock(ProcessBundleHandler.HandleStateCallsForBundle.class);
     QueueingBeamFnDataClient queueingClient = mock(QueueingBeamFnDataClient.class);
+    ThrowingRunnable resetFunction = mock(ThrowingRunnable.class);
     BundleProcessor bundleProcessor =
         BundleProcessor.create(
             startFunctionRegistry,
             finishFunctionRegistry,
+            Collections.singletonList(resetFunction),
             new ArrayList<>(),
             new ArrayList<>(),
             splitListener,
@@ -639,6 +669,7 @@ public class ProcessBundleHandlerTest {
     verify(metricsContainerRegistry, times(1)).reset();
     verify(stateTracker, times(1)).reset();
     verify(bundleFinalizationCallbacks, times(1)).clear();
+    verify(resetFunction, times(1)).run();
   }
 
   @Test
@@ -675,6 +706,7 @@ public class ProcessBundleHandlerTest {
                     pCollectionConsumerRegistry,
                     startFunctionRegistry,
                     finishFunctionRegistry,
+                    addResetFunction,
                     addTearDownFunction,
                     addProgressRequestCallback,
                     splitListener,
@@ -728,6 +760,7 @@ public class ProcessBundleHandlerTest {
                         pCollectionConsumerRegistry,
                         startFunctionRegistry,
                         finishFunctionRegistry,
+                        addResetFunction,
                         addTearDownFunction,
                         addProgressRequestCallback,
                         splitListener,
@@ -796,6 +829,7 @@ public class ProcessBundleHandlerTest {
                         pCollectionConsumerRegistry,
                         startFunctionRegistry,
                         finishFunctionRegistry,
+                        addResetFunction,
                         addTearDownFunction,
                         addProgressRequestCallback,
                         splitListener,
@@ -854,6 +888,7 @@ public class ProcessBundleHandlerTest {
                         pCollectionConsumerRegistry,
                         startFunctionRegistry,
                         finishFunctionRegistry,
+                        addResetFunction,
                         addTearDownFunction,
                         addProgressRequestCallback,
                         splitListener,
@@ -950,6 +985,7 @@ public class ProcessBundleHandlerTest {
                       PCollectionConsumerRegistry pCollectionConsumerRegistry,
                       PTransformFunctionRegistry startFunctionRegistry,
                       PTransformFunctionRegistry finishFunctionRegistry,
+                      Consumer<ThrowingRunnable> addResetFunction,
                       Consumer<ThrowingRunnable> addTearDownFunction,
                       Consumer<ProgressRequestCallback> addProgressRequestCallback,
                       BundleSplitListener splitListener,
@@ -1015,6 +1051,7 @@ public class ProcessBundleHandlerTest {
                       PCollectionConsumerRegistry pCollectionConsumerRegistry,
                       PTransformFunctionRegistry startFunctionRegistry,
                       PTransformFunctionRegistry finishFunctionRegistry,
+                      Consumer<ThrowingRunnable> addResetFunction,
                       Consumer<ThrowingRunnable> addTearDownFunction,
                       Consumer<ProgressRequestCallback> addProgressRequestCallback,
                       BundleSplitListener splitListener,
@@ -1078,6 +1115,7 @@ public class ProcessBundleHandlerTest {
                       PCollectionConsumerRegistry pCollectionConsumerRegistry,
                       PTransformFunctionRegistry startFunctionRegistry,
                       PTransformFunctionRegistry finishFunctionRegistry,
+                      Consumer<ThrowingRunnable> addResetFunction,
                       Consumer<ThrowingRunnable> addTearDownFunction,
                       Consumer<ProgressRequestCallback> addProgressRequestCallback,
                       BundleSplitListener splitListener,