You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/26 15:19:58 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-372] Fix pipeline health check

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 461e165eea4b758fe16bd4823f894b42b8c613dd
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Wed May 26 17:19:41 2021 +0200

    [STREAMPIPES-372] Fix pipeline health check
---
 .../manager/health/PipelineHealthCheck.java        | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 245110e..50739fb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -31,12 +31,13 @@ import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class PipelineHealthCheck implements Runnable {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
-  private static final int MAX_FAILED_ATTEMPTS = 3;
+  private static final int MAX_FAILED_ATTEMPTS = 10;
 
   private static final Map<String, Integer> failedRestartAttempts = new HashMap<>();
 
@@ -45,6 +46,7 @@ public class PipelineHealthCheck implements Runnable {
   }
 
   public void checkAndRestorePipelineElements() {
+    LOG.info("Performing health check...");
     List<Pipeline> runningPipelines = getRunningPipelines();
 
     if (runningPipelines.size() > 0) {
@@ -52,15 +54,19 @@ public class PipelineHealthCheck implements Runnable {
       List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());
 
       runningPipelines.forEach(pipeline -> {
+        AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
+        List<String> failedInstances = new ArrayList<>();
+        List<String> recoveredInstances = new ArrayList<>();
         List<String> pipelineNotifications = new ArrayList<>();
         List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
         graphs.forEach(graph -> {
           String instanceId = extractInstanceId(graph);
           if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
             if (shouldRetry(instanceId)) {
+              shouldUpdatePipeline.set(true);
               boolean success = new HttpRequestBuilder(graph, graph.getBelongsTo()).invoke().isSuccess();
               if (!success) {
-                pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+                failedInstances.add(instanceId);
                 addFailedAttemptNotification(pipelineNotifications, graph);
                 increaseFailedAttempt(instanceId);
                 LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})",
@@ -69,16 +75,23 @@ public class PipelineHealthCheck implements Runnable {
                         failedRestartAttempts.get(instanceId),
                         MAX_FAILED_ATTEMPTS);
               } else {
-                pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+                recoveredInstances.add(instanceId);
                 addSuccessfulRestoreNotification(pipelineNotifications, graph);
                 resetFailedAttempts(instanceId);
                 LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName());
               }
-              pipeline.setPipelineNotifications(pipelineNotifications);
-              StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
             }
           }
         });
+        if (shouldUpdatePipeline.get()) {
+          if (failedInstances.size() > 0) {
+            pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+          } else if (recoveredInstances.size() > 0) {
+            pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+          }
+          pipeline.setPipelineNotifications(pipelineNotifications);
+          StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+        }
       });
     }
   }
@@ -87,7 +100,7 @@ public class PipelineHealthCheck implements Runnable {
     if (!failedRestartAttempts.containsKey(instanceId)) {
       return true;
     } else {
-      return failedRestartAttempts.get(instanceId) <= MAX_FAILED_ATTEMPTS;
+      return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
     }
   }