You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hop.apache.org by ha...@apache.org on 2022/05/14 12:05:13 UTC

[hop] branch master updated: Safely call methods for PipelineResult

This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/master by this push:
     new cc7fa65c6c Safely call methods for PipelineResult
     new ee46cccfa7 Merge pull request #1486 from shlxue/master
cc7fa65c6c is described below

commit cc7fa65c6cb6a8c86d95f2479502109a429b3e21
Author: Shl Xue <xu...@gmail.com>
AuthorDate: Fri May 13 18:37:19 2022 +0800

    Safely call methods for PipelineResult
---
 .../hop/beam/engines/BeamPipelineEngine.java       | 32 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 6 deletions(-)

diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
index 57f312ee6b..0cac12a8e2 100644
--- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
+++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
@@ -17,6 +17,7 @@
 
 package org.apache.hop.beam.engines;
 
+import org.apache.beam.runners.core.metrics.DefaultMetricResults;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.direct.DirectRunner;
 import org.apache.beam.runners.flink.FlinkRunner;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.util.ThrowingSupplier;
 import org.apache.hop.beam.metadata.RunnerType;
 import org.apache.hop.beam.pipeline.HopPipelineMetaToBeamPipelineConverter;
 import org.apache.hop.core.IRowSet;
@@ -52,6 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public abstract class BeamPipelineEngine extends Variables
     implements IPipelineEngine<PipelineMeta> {
 
+  static MetricResults EMPTY_METRIC_RESULTS =
+      new DefaultMetricResults(
+          Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
   /**
    * Constant specifying a filename containing XML to inject into a ZIP file created during resource
    * export.
@@ -375,7 +380,7 @@ public abstract class BeamPipelineEngine extends Variables
       if (beamPipelineResults != null) {
         Set<String> transformNames = new HashSet<>(Arrays.asList(pipelineMeta.getTransformNames()));
         Map<String, EngineComponent> componentsMap = new HashMap<>();
-        MetricResults metrics = beamPipelineResults.metrics();
+        MetricResults metrics = safelyCall(() -> beamPipelineResults.metrics(), EMPTY_METRIC_RESULTS);
         MetricQueryResults allResults = metrics.queryMetrics(MetricsFilter.builder().build());
 
         for (MetricResult<Long> result : allResults.getCounters()) {
@@ -418,7 +423,7 @@ public abstract class BeamPipelineEngine extends Variables
 
             // Set the transform status to reflect the pipeline status.
             //
-            switch (beamPipelineResults.getState()) {
+            switch (safelyCall(() -> beamPipelineResults.getState(), PipelineResult.State.UNKNOWN)) {
               case DONE:
                 engineComponent.setRunning(false);
                 engineComponent.setStatus(ComponentExecutionStatus.STATUS_FINISHED);
@@ -476,14 +481,14 @@ public abstract class BeamPipelineEngine extends Variables
   }
 
   protected synchronized void evaluatePipelineStatus() throws HopException {
-    if (beamPipelineResults == null || beamPipelineResults.getState() == null) {
+    if (beamPipelineResults == null || safelyCall(() -> beamPipelineResults.getState()) == null) {
       statusDescription = "";
       return;
     }
 
     // This seems to be the most reliable way of checking the state...
     //
-    PipelineResult.State pipelineState = beamPipelineResults.waitUntilFinish(Duration.millis(1));
+    PipelineResult.State pipelineState = safelyCall(() -> beamPipelineResults.waitUntilFinish(Duration.millis(1)));
     if (pipelineState != null) {
       boolean cancelPipeline = false;
       boolean cancelRefreshTimer = false;
@@ -530,7 +535,7 @@ public abstract class BeamPipelineEngine extends Variables
 
       if (cancelPipeline) {
         try {
-          beamPipelineResults.cancel();
+          safelyCall(() -> beamPipelineResults.cancel());
           logChannel.logBasic("Pipeline execution cancelled");
         } catch (Exception e) {
           logChannel.logError("Cancellation of pipeline failed", e);
@@ -598,7 +603,7 @@ public abstract class BeamPipelineEngine extends Variables
   public void stopAll() {
     try {
       if (beamPipelineResults != null) {
-        beamPipelineResults.cancel();
+        safelyCall(() -> beamPipelineResults.cancel());
         evaluatePipelineStatus();
       }
     } catch (Exception e) {
@@ -1532,4 +1537,19 @@ public abstract class BeamPipelineEngine extends Variables
       IBeamPipelineEngineRunConfiguration beamEngineRunConfiguration) {
     this.beamEngineRunConfiguration = beamEngineRunConfiguration;
   }
+
+  private <R> R safelyCall(ThrowingSupplier<R> supplier) {
+    return safelyCall(supplier, null);
+  }
+
+  private <R> R safelyCall(ThrowingSupplier<R> supplier, R defaultValue) {
+    try {
+      return supplier.get();
+    } catch (UnsupportedOperationException e) {
+      logChannel.logBasic(e.getMessage());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return defaultValue;
+  }
 }