You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/05/14 12:05:13 UTC
[hop] branch master updated: Safely call methods for PipelineResult
This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new cc7fa65c6c Safely call methods for PipelineResult
new ee46cccfa7 Merge pull request #1486 from shlxue/master
cc7fa65c6c is described below
commit cc7fa65c6cb6a8c86d95f2479502109a429b3e21
Author: Shl Xue <xu...@gmail.com>
AuthorDate: Fri May 13 18:37:19 2022 +0800
Safely call methods for PipelineResult
---
.../hop/beam/engines/BeamPipelineEngine.java | 32 ++++++++++++++++++----
1 file changed, 26 insertions(+), 6 deletions(-)
diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
index 57f312ee6b..0cac12a8e2 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
@@ -17,6 +17,7 @@
package org.apache.hop.beam.engines;
+import org.apache.beam.runners.core.metrics.DefaultMetricResults;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.flink.FlinkRunner;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.util.ThrowingSupplier;
import org.apache.hop.beam.metadata.RunnerType;
import org.apache.hop.beam.pipeline.HopPipelineMetaToBeamPipelineConverter;
import org.apache.hop.core.IRowSet;
@@ -52,6 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class BeamPipelineEngine extends Variables
implements IPipelineEngine<PipelineMeta> {
+ static MetricResults EMPTY_METRIC_RESULTS =
+ new DefaultMetricResults(
+ Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
/**
* Constant specifying a filename containing XML to inject into a ZIP file created during resource
* export.
@@ -375,7 +380,7 @@ public abstract class BeamPipelineEngine extends Variables
if (beamPipelineResults != null) {
Set<String> transformNames = new HashSet<>(Arrays.asList(pipelineMeta.getTransformNames()));
Map<String, EngineComponent> componentsMap = new HashMap<>();
- MetricResults metrics = beamPipelineResults.metrics();
+ MetricResults metrics = safelyCall(() -> beamPipelineResults.metrics(), EMPTY_METRIC_RESULTS);
MetricQueryResults allResults = metrics.queryMetrics(MetricsFilter.builder().build());
for (MetricResult<Long> result : allResults.getCounters()) {
@@ -418,7 +423,7 @@ public abstract class BeamPipelineEngine extends Variables
// Set the transform status to reflect the pipeline status.
//
- switch (beamPipelineResults.getState()) {
+ switch (safelyCall(() -> beamPipelineResults.getState(), PipelineResult.State.UNKNOWN)) {
case DONE:
engineComponent.setRunning(false);
engineComponent.setStatus(ComponentExecutionStatus.STATUS_FINISHED);
@@ -476,14 +481,14 @@ public abstract class BeamPipelineEngine extends Variables
}
protected synchronized void evaluatePipelineStatus() throws HopException {
- if (beamPipelineResults == null || beamPipelineResults.getState() == null) {
+ if (beamPipelineResults == null || safelyCall(() -> beamPipelineResults.getState()) == null) {
statusDescription = "";
return;
}
// This seems to be the most reliable way of checking the state...
//
- PipelineResult.State pipelineState = beamPipelineResults.waitUntilFinish(Duration.millis(1));
+ PipelineResult.State pipelineState = safelyCall(() -> beamPipelineResults.waitUntilFinish(Duration.millis(1)));
if (pipelineState != null) {
boolean cancelPipeline = false;
boolean cancelRefreshTimer = false;
@@ -530,7 +535,7 @@ public abstract class BeamPipelineEngine extends Variables
if (cancelPipeline) {
try {
- beamPipelineResults.cancel();
+ safelyCall(() -> beamPipelineResults.cancel());
logChannel.logBasic("Pipeline execution cancelled");
} catch (Exception e) {
logChannel.logError("Cancellation of pipeline failed", e);
@@ -598,7 +603,7 @@ public abstract class BeamPipelineEngine extends Variables
public void stopAll() {
try {
if (beamPipelineResults != null) {
- beamPipelineResults.cancel();
+ safelyCall(() -> beamPipelineResults.cancel());
evaluatePipelineStatus();
}
} catch (Exception e) {
@@ -1532,4 +1537,19 @@ public abstract class BeamPipelineEngine extends Variables
IBeamPipelineEngineRunConfiguration beamEngineRunConfiguration) {
this.beamEngineRunConfiguration = beamEngineRunConfiguration;
}
+
+ private <R> R safelyCall(ThrowingSupplier<R> supplier) {
+ return safelyCall(supplier, null);
+ }
+
+ private <R> R safelyCall(ThrowingSupplier<R> supplier, R defaultValue) {
+ try {
+ return supplier.get();
+ } catch (UnsupportedOperationException e) {
+ logChannel.logBasic(e.getMessage());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return defaultValue;
+ }
}