You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/21 19:45:58 UTC

[beam] branch master updated: Release bundle processor when any exceptions during processing.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9fa621  Release bundle processor when any exceptions during processing.
     new 060b083  Merge pull request #13568 from [BEAM-3245] Release bundle processor when any exceptions during processing.
e9fa621 is described below

commit e9fa621a10ce6a84179fba1d54b413535a1a46a7
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Dec 16 10:44:57 2020 -0800

    Release bundle processor when any exceptions during processing.
---
 .../fn/harness/control/ProcessBundleHandler.java   | 87 ++++++++++++----------
 1 file changed, 46 insertions(+), 41 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index e93c67b..4ecb5f5 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -293,55 +293,60 @@ public class ProcessBundleHandler {
     ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
     QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient();
 
-    try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) {
-      try (Closeable closeTracker = stateTracker.activate()) {
-        // Already in reverse topological order so we don't need to do anything.
-        for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
-          LOG.debug("Starting function {}", startFunction);
-          startFunction.run();
-        }
+    try {
+      try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) {
+        try (Closeable closeTracker = stateTracker.activate()) {
+          // Already in reverse topological order so we don't need to do anything.
+          for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
+            LOG.debug("Starting function {}", startFunction);
+            startFunction.run();
+          }
 
-        queueingClient.drainAndBlock();
+          queueingClient.drainAndBlock();
 
-        // Need to reverse this since we want to call finish in topological order.
-        for (ThrowingRunnable finishFunction :
-            Lists.reverse(finishFunctionRegistry.getFunctions())) {
-          LOG.debug("Finishing function {}", finishFunction);
-          finishFunction.run();
+          // Need to reverse this since we want to call finish in topological order.
+          for (ThrowingRunnable finishFunction :
+              Lists.reverse(finishFunctionRegistry.getFunctions())) {
+            LOG.debug("Finishing function {}", finishFunction);
+            finishFunction.run();
+          }
         }
-      }
 
-      // Add all checkpointed residuals to the response.
-      response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
-
-      // TODO(BEAM-6597): This should be reporting monitoring infos using the short id system.
-      // Get start bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Get process bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
-      // Get finish bundle Execution Time Metrics.
-      response.addAllMonitoringInfos(
-          bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
-      // Extract MonitoringInfos that come from the metrics container registry.
-      response.addAllMonitoringInfos(
-          bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
-      // Add any additional monitoring infos that the "runners" report explicitly.
-      for (ProgressRequestCallback progressRequestCallback :
-          bundleProcessor.getProgressRequestCallbacks()) {
-        response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
-      }
+        // Add all checkpointed residuals to the response.
+        response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
+
+        // TODO(BEAM-6597): This should be reporting monitoring infos using the short id system.
+        // Get start bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
+        // Get process bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
+        // Get finish bundle Execution Time Metrics.
+        response.addAllMonitoringInfos(
+            bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
+        // Extract MonitoringInfos that come from the metrics container registry.
+        response.addAllMonitoringInfos(
+            bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
+        // Add any additional monitoring infos that the "runners" report explicitly.
+        for (ProgressRequestCallback progressRequestCallback :
+            bundleProcessor.getProgressRequestCallbacks()) {
+          response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
+        }
 
-      if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
-        finalizeBundleHandler.registerCallbacks(
-            bundleProcessor.getInstructionId(),
-            ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
-        response.setRequiresFinalization(true);
+        if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
+          finalizeBundleHandler.registerCallbacks(
+              bundleProcessor.getInstructionId(),
+              ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
+          response.setRequiresFinalization(true);
+        }
       }
-
       bundleProcessorCache.release(
           request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
+    } catch (Exception e) {
+      bundleProcessorCache.release(
+          request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
+      throw e;
     }
     return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
   }