You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/17 20:48:59 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-74]
Initiate multiple attempts to start pipelines upon startup
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new d04a5b7 [STREAMPIPES-74] Initiate multiple attempts to start pipelines upon startup
d04a5b7 is described below
commit d04a5b7ed5b01c3de9b55b0d79082b0733848f9c
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 17 22:48:44 2021 +0200
[STREAMPIPES-74] Initiate multiple attempts to start pipelines upon startup
---
.../backend/StreamPipesBackendApplication.java | 90 +++++++++++++++++-----
1 file changed, 70 insertions(+), 20 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
index 899eb8d..53cd75b 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesBackendApplication.java
@@ -38,10 +38,13 @@ import org.springframework.context.annotation.Import;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContextListener;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Configuration
@EnableAutoConfiguration
@@ -50,6 +53,12 @@ public class StreamPipesBackendApplication {
private static final Logger LOG = LoggerFactory.getLogger(StreamPipesBackendApplication.class.getCanonicalName());
+ private static final int MAX_PIPELINE_START_RETRIES = 3;
+ private static final int WAIT_TIME_AFTER_FAILURE_IN_SECONDS = 10;
+
+ private ScheduledExecutorService executorService;
+ private Map<String, Integer> failedPipelines = new HashMap<>();
+
public static void main(String[] args) {
System.setProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "true");
SpringApplication.run(StreamPipesBackendApplication.class, args);
@@ -57,22 +66,32 @@ public class StreamPipesBackendApplication {
@PostConstruct
public void init() {
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ this.executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule(this::startAllPreviouslyStoppedPipelines, 5, TimeUnit.SECONDS);
}
+ private void schedulePipelineStart(Pipeline pipeline, boolean restartOnReboot) {
+ executorService.schedule(() -> {
+ startPipeline(pipeline, restartOnReboot);
+ }, WAIT_TIME_AFTER_FAILURE_IN_SECONDS, TimeUnit.SECONDS);
+ }
+
@PreDestroy
public void onExit() {
LOG.info("Shutting down StreamPipes...");
LOG.info("Flagging currently running pipelines for restart...");
- getAllPipelines()
+ List<Pipeline> pipelinesToStop = getAllPipelines()
.stream()
.filter(Pipeline::isRunning)
- .forEach(pipeline -> {
- pipeline.setRestartOnSystemReboot(true);
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
- });
+ .collect(Collectors.toList());
+
+ LOG.info("Found {} running pipelines which will be stopped...", pipelinesToStop.size());
+
+ pipelinesToStop.forEach(pipeline -> {
+ pipeline.setRestartOnSystemReboot(true);
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+ });
LOG.info("Gracefully stopping all running pipelines...");
List<PipelineOperationStatus> status = Operations.stopAllPipelines();
@@ -89,35 +108,66 @@ public class StreamPipesBackendApplication {
private void startAllPreviouslyStoppedPipelines() {
LOG.info("Checking for orphaned pipelines...");
- getAllPipelines()
+ List<Pipeline> orphanedPipelines = getAllPipelines()
.stream()
.filter(Pipeline::isRunning)
- .forEach(pipeline -> {
- LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
- startPipeline(pipeline);
- });
+ .collect(Collectors.toList());
+
+ LOG.info("Found {} orphaned pipelines", orphanedPipelines.size());
+
+ orphanedPipelines.forEach(pipeline -> {
+ LOG.info("Restoring orphaned pipeline {}", pipeline.getName());
+ startPipeline(pipeline, false);
+ });
LOG.info("Checking for gracefully shut down pipelines to be restarted...");
- getAllPipelines()
+ List<Pipeline> pipelinesToRestart = getAllPipelines()
.stream()
- .filter(p -> ! (p.isRunning()))
+ .filter(p -> !(p.isRunning()))
.filter(Pipeline::isRestartOnSystemReboot)
- .forEach(pipeline -> {
- startPipeline(pipeline);
- pipeline.setRestartOnSystemReboot(false);
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
- });
+ .collect(Collectors.toList());
+
+ LOG.info("Found {} pipelines that we are attempting to restart...", pipelinesToRestart.size());
+
+ pipelinesToRestart.forEach(pipeline -> {
+ startPipeline(pipeline, false);
+ });
LOG.info("No more pipelines to restore...");
}
- private void startPipeline(Pipeline pipeline) {
+ private void startPipeline(Pipeline pipeline, boolean restartOnReboot) {
PipelineOperationStatus status = Operations.startPipeline(pipeline);
if (status.isSuccess()) {
LOG.info("Pipeline {} successfully restarted", status.getPipelineName());
+ pipeline.setRestartOnSystemReboot(restartOnReboot);
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+ } else {
+ storeFailedRestartAttempt(pipeline);
+ int failedAttemptCount = failedPipelines.get(pipeline.getPipelineId());
+ if (failedAttemptCount <= MAX_PIPELINE_START_RETRIES) {
+ LOG.error("Pipeline {} could not be restarted - I'll try again in {} seconds ({}/{} failed attempts)",
+ pipeline.getName(),
+ WAIT_TIME_AFTER_FAILURE_IN_SECONDS,
+ failedAttemptCount,
+ MAX_PIPELINE_START_RETRIES);
+
+ schedulePipelineStart(pipeline, restartOnReboot);
+ } else {
+ LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?",
+ status.getPipelineName());
+ }
+ }
+ }
+
+ private void storeFailedRestartAttempt(Pipeline pipeline) {
+ String pipelineId = pipeline.getPipelineId();
+ if (!failedPipelines.containsKey(pipelineId)) {
+ failedPipelines.put(pipelineId, 1);
} else {
- LOG.error("Pipeline {} could not be restarted - are all pipeline element containers running?", status.getPipelineName());
+ int failedAttempts = failedPipelines.get(pipelineId) + 1;
+ failedPipelines.put(pipelineId, failedAttempts);
}
}