You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/17 20:48:59 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-74] Initiate multiple attempts to start pipelines upon startup

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new d04a5b7  [STREAMPIPES-74] Initiate multiple attempts to start pipelines upon startup
d04a5b7 is described below

commit d04a5b7ed5b01c3de9b55b0d79082b0733848f9c
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 17 22:48:44 2021 +0200

    [STREAMPIPES-74] Initiate multiple attempts to start pipelines upon startup
---
 .../backend/StreamPipesBackendApplication.java     | 90 +++++++++++++++++-----
 1 file changed, 70 insertions(+), 20 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 899eb8d..53cd75b 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -38,10 +38,13 @@ import org.springframework.context.annotation.Import;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.servlet.ServletContextListener;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @Configuration
 @EnableAutoConfiguration
@@ -50,6 +53,12 @@ public class StreamPipesBackendApplication {
 
   private static final Logger LOG = LoggerFactory.getLogger(StreamPipesBackendApplication.class.getCanonicalName());
 
+  private static final int MAX_PIPELINE_START_RETRIES = 3;
+  private static final int WAIT_TIME_AFTER_FAILURE_IN_SECONDS = 10;
+
+  private ScheduledExecutorService executorService;
+  private Map<String, Integer> failedPipelines = new HashMap<>();
+
   public static void main(String[] args) {
     System.setProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "true");
     SpringApplication.run(StreamPipesBackendApplication.class, args);
@@ -57,22 +66,32 @@ public class StreamPipesBackendApplication {
 
   @PostConstruct
   public void init() {
-    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+    this.executorService = Executors.newSingleThreadScheduledExecutor();
 
     executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
   }
 
+  private void schedulePipelineStart(Pipeline pipeline, boolean restartOnReboot) {
+    executorService.schedule(() -> {
+      startPipeline(pipeline, restartOnReboot);
+    }, WAIT_TIME_AFTER_FAILURE_IN_SECONDS, TimeUnit.SECONDS);
+  }
+
   @PreDestroy
   public void onExit() {
     LOG.info("Shutting down StreamPipes...");
     LOG.info("Flagging currently running pipelines for restart...");
-    getAllPipelines()
+    List<Pipeline> pipelinesToStop = getAllPipelines()
             .stream()
             .filter(Pipeline::isRunning)
-            .forEach(pipeline -> {
-              pipeline.setRestartOnSystemReboot(true);
-              StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
-            });
+            .collect(Collectors.toList());
+
+    LOG.info("Found {} running pipelines which will be stopped...", pipelinesToStop.size());
+
+    pipelinesToStop.forEach(pipeline -> {
+      pipeline.setRestartOnSystemReboot(true);
+      StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+    });
 
     LOG.info("Gracefully stopping all running pipelines...");
     List<PipelineOperationStatus> status = Operations.stopAllPipelines();
@@ -89,35 +108,66 @@ public class StreamPipesBackendApplication {
 
   private void startAllPreviouslyStoppedPipelines() {
     LOG.info("Checking for orphaned pipelines...");
-    getAllPipelines()
+    List<Pipeline> orphanedPipelines = getAllPipelines()
             .stream()
             .filter(Pipeline::isRunning)
-            .forEach(pipeline -> {
-              LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
-              startPipeline(pipeline);
-            });
+            .collect(Collectors.toList());
+
+    LOG.info("Found {} orphaned pipelines", orphanedPipelines.size());
+
+    orphanedPipelines.forEach(pipeline -> {
+      LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
+      startPipeline(pipeline, false);
+    });
 
     LOG.info("Checking for gracefully shut down pipelines to be restarted...");
 
-    getAllPipelines()
+    List<Pipeline> pipelinesToRestart = getAllPipelines()
             .stream()
-            .filter(p -> ! (p.isRunning()))
+            .filter(p -> !(p.isRunning()))
             .filter(Pipeline::isRestartOnSystemReboot)
-            .forEach(pipeline -> {
-              startPipeline(pipeline);
-              pipeline.setRestartOnSystemReboot(false);
-              StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
-            });
+            .collect(Collectors.toList());
+
+    LOG.info("Found {} pipelines that we are attempting to restart...", pipelinesToRestart.size());
+
+    pipelinesToRestart.forEach(pipeline -> {
+      startPipeline(pipeline, false);
+    });
 
     LOG.info("No more pipelines to restore...");
   }
 
-  private void startPipeline(Pipeline pipeline) {
+  private void startPipeline(Pipeline pipeline, boolean restartOnReboot) {
     PipelineOperationStatus status = Operations.startPipeline(pipeline);
     if (status.isSuccess()) {
       LOG.info("Pipeline {} successfully restarted", status.getPipelineName());
+      pipeline.setRestartOnSystemReboot(restartOnReboot);
+      StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+    } else {
+      storeFailedRestartAttempt(pipeline);
+      int failedAttemptCount = failedPipelines.get(pipeline.getPipelineId());
+      if (failedAttemptCount <= MAX_PIPELINE_START_RETRIES) {
+        LOG.error("Pipeline {} could not be restarted - I'll try again in {} seconds ({}/{} failed attempts)",
+                pipeline.getName(),
+                WAIT_TIME_AFTER_FAILURE_IN_SECONDS,
+                failedAttemptCount,
+                MAX_PIPELINE_START_RETRIES);
+
+        schedulePipelineStart(pipeline, restartOnReboot);
+      } else {
+        LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?",
+                status.getPipelineName());
+      }
+    }
+  }
+
+  private void storeFailedRestartAttempt(Pipeline pipeline) {
+    String pipelineId = pipeline.getPipelineId();
+    if (!failedPipelines.containsKey(pipelineId)) {
+      failedPipelines.put(pipelineId, 1);
     } else {
-      LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?", status.getPipelineName());
+      int failedAttempts = failedPipelines.get(pipelineId) + 1;
+      failedPipelines.put(pipelineId, failedAttempts);
     }
   }