You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/26 15:19:58 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-372] Fix pipeline
health check
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 461e165eea4b758fe16bd4823f894b42b8c613dd
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Wed May 26 17:19:41 2021 +0200
[STREAMPIPES-372] Fix pipeline health check
---
.../manager/health/PipelineHealthCheck.java | 25 ++++++++++++++++------
1 file changed, 19 insertions(+), 6 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
index 245110e..50739fb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java
@@ -31,12 +31,13 @@ import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class PipelineHealthCheck implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(PipelineHealthCheck.class);
- private static final int MAX_FAILED_ATTEMPTS = 3;
+ private static final int MAX_FAILED_ATTEMPTS = 10;
private static final Map<String, Integer> failedRestartAttempts = new HashMap<>();
@@ -45,6 +46,7 @@ public class PipelineHealthCheck implements Runnable {
}
public void checkAndRestorePipelineElements() {
+ LOG.info("Performing health check...");
List<Pipeline> runningPipelines = getRunningPipelines();
if (runningPipelines.size() > 0) {
@@ -52,15 +54,19 @@ public class PipelineHealthCheck implements Runnable {
List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());
runningPipelines.forEach(pipeline -> {
+ AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
+ List<String> failedInstances = new ArrayList<>();
+ List<String> recoveredInstances = new ArrayList<>();
List<String> pipelineNotifications = new ArrayList<>();
List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
graphs.forEach(graph -> {
String instanceId = extractInstanceId(graph);
if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
if (shouldRetry(instanceId)) {
+ shouldUpdatePipeline.set(true);
boolean success = new HttpRequestBuilder(graph, graph.getBelongsTo()).invoke().isSuccess();
if (!success) {
- pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+ failedInstances.add(instanceId);
addFailedAttemptNotification(pipelineNotifications, graph);
increaseFailedAttempt(instanceId);
LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})",
@@ -69,16 +75,23 @@ public class PipelineHealthCheck implements Runnable {
failedRestartAttempts.get(instanceId),
MAX_FAILED_ATTEMPTS);
} else {
- pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+ recoveredInstances.add(instanceId);
addSuccessfulRestoreNotification(pipelineNotifications, graph);
resetFailedAttempts(instanceId);
LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(), pipeline.getName());
}
- pipeline.setPipelineNotifications(pipelineNotifications);
- StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
}
}
});
+ if (shouldUpdatePipeline.get()) {
+ if (failedInstances.size() > 0) {
+ pipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
+ } else if (recoveredInstances.size() > 0) {
+ pipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
+ }
+ pipeline.setPipelineNotifications(pipelineNotifications);
+ StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updatePipeline(pipeline);
+ }
});
}
}
@@ -87,7 +100,7 @@ public class PipelineHealthCheck implements Runnable {
if (!failedRestartAttempts.containsKey(instanceId)) {
return true;
} else {
- return failedRestartAttempts.get(instanceId) <= MAX_FAILED_ATTEMPTS;
+ return failedRestartAttempts.get(instanceId) < MAX_FAILED_ATTEMPTS;
}
}