You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/12/21 19:45:58 UTC
[beam] branch master updated: Release bundle processor when any
exceptions during processing.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e9fa621 Release bundle processor when any exceptions during processing.
new 060b083 Merge pull request #13568 from [BEAM-3245] Release bundle processor when any exceptions during processing.
e9fa621 is described below
commit e9fa621a10ce6a84179fba1d54b413535a1a46a7
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Dec 16 10:44:57 2020 -0800
Release bundle processor when any exceptions during processing.
---
.../fn/harness/control/ProcessBundleHandler.java | 87 ++++++++++++----------
1 file changed, 46 insertions(+), 41 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index e93c67b..4ecb5f5 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -293,55 +293,60 @@ public class ProcessBundleHandler {
ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
QueueingBeamFnDataClient queueingClient = bundleProcessor.getQueueingClient();
- try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) {
- try (Closeable closeTracker = stateTracker.activate()) {
- // Already in reverse topological order so we don't need to do anything.
- for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
- LOG.debug("Starting function {}", startFunction);
- startFunction.run();
- }
+ try {
+ try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient()) {
+ try (Closeable closeTracker = stateTracker.activate()) {
+ // Already in reverse topological order so we don't need to do anything.
+ for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
+ LOG.debug("Starting function {}", startFunction);
+ startFunction.run();
+ }
- queueingClient.drainAndBlock();
+ queueingClient.drainAndBlock();
- // Need to reverse this since we want to call finish in topological order.
- for (ThrowingRunnable finishFunction :
- Lists.reverse(finishFunctionRegistry.getFunctions())) {
- LOG.debug("Finishing function {}", finishFunction);
- finishFunction.run();
+ // Need to reverse this since we want to call finish in topological order.
+ for (ThrowingRunnable finishFunction :
+ Lists.reverse(finishFunctionRegistry.getFunctions())) {
+ LOG.debug("Finishing function {}", finishFunction);
+ finishFunction.run();
+ }
}
- }
- // Add all checkpointed residuals to the response.
- response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
-
- // TODO(BEAM-6597): This should be reporting monitoring infos using the short id system.
- // Get start bundle Execution Time Metrics.
- response.addAllMonitoringInfos(
- bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
- // Get process bundle Execution Time Metrics.
- response.addAllMonitoringInfos(
- bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
- // Get finish bundle Execution Time Metrics.
- response.addAllMonitoringInfos(
- bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
- // Extract MonitoringInfos that come from the metrics container registry.
- response.addAllMonitoringInfos(
- bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
- // Add any additional monitoring infos that the "runners" report explicitly.
- for (ProgressRequestCallback progressRequestCallback :
- bundleProcessor.getProgressRequestCallbacks()) {
- response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
- }
+ // Add all checkpointed residuals to the response.
+ response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
+
+ // TODO(BEAM-6597): This should be reporting monitoring infos using the short id system.
+ // Get start bundle Execution Time Metrics.
+ response.addAllMonitoringInfos(
+ bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringInfos());
+ // Get process bundle Execution Time Metrics.
+ response.addAllMonitoringInfos(
+ bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringInfos());
+ // Get finish bundle Execution Time Metrics.
+ response.addAllMonitoringInfos(
+ bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringInfos());
+ // Extract MonitoringInfos that come from the metrics container registry.
+ response.addAllMonitoringInfos(
+ bundleProcessor.getMetricsContainerRegistry().getMonitoringInfos());
+ // Add any additional monitoring infos that the "runners" report explicitly.
+ for (ProgressRequestCallback progressRequestCallback :
+ bundleProcessor.getProgressRequestCallbacks()) {
+ response.addAllMonitoringInfos(progressRequestCallback.getMonitoringInfos());
+ }
- if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
- finalizeBundleHandler.registerCallbacks(
- bundleProcessor.getInstructionId(),
- ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
- response.setRequiresFinalization(true);
+ if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
+ finalizeBundleHandler.registerCallbacks(
+ bundleProcessor.getInstructionId(),
+ ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
+ response.setRequiresFinalization(true);
+ }
}
-
bundleProcessorCache.release(
request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
+ } catch (Exception e) {
+ bundleProcessorCache.release(
+ request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
+ throw e;
}
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
}