You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/10/21 13:58:03 UTC
[brooklyn-server] 11/11: workflow basic expiry, and more workflow fixes
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit b35bab6dcd8f74ea727f9a1ea0fe9da90a260802
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Fri Oct 21 14:55:06 2022 +0100
workflow basic expiry, and more workflow fixes
---
.../core/workflow/WorkflowErrorHandling.java | 2 +
.../core/workflow/WorkflowExecutionContext.java | 59 ++++++++++++++++++++--
.../core/workflow/WorkflowStepDefinition.java | 3 ++
.../core/workflow/steps/RetryWorkflowStep.java | 6 ++-
.../store/WorkflowStatePersistenceViaSensors.java | 29 +++++++++--
.../brooklyn/util/core/task/TaskBuilder.java | 5 +-
.../workflow/WorkflowPersistReplayErrorsTest.java | 4 +-
7 files changed, 96 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
index f17d25792d..532ab13ee4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
@@ -52,6 +52,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor
log.debug("Encountered error in step "+context.getWorkflowStepReference()+" '" + stepTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error));
Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowStepReference()+"-error-handler")
.tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context, null, context.getTaskId()))
+ .tag(BrooklynTaskTags.WORKFLOW_TAG)
.body(new WorkflowErrorHandling(step.getOnError(), context.getWorkflowExectionContext(), context.getWorkflowExectionContext().currentStepIndex, stepTask, error))
.build();
TaskTags.addTagDynamically(stepTask, BrooklynTaskTags.tagForErrorHandledBy(task));
@@ -64,6 +65,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor
log.debug("Encountered error in workflow "+context.getWorkflowId()+"/"+context.getTaskId()+" '" + workflowTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error));
Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowId()+"-error-handler")
.tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context))
+ .tag(BrooklynTaskTags.WORKFLOW_TAG)
.body(new WorkflowErrorHandling(context.onError, context, null, workflowTask, error))
.build();
TaskTags.addTagDynamically(workflowTask, BrooklynTaskTags.tagForErrorHandledBy(task));
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 76752c672f..2d2eb1688e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -20,7 +20,9 @@ package org.apache.brooklyn.core.workflow;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -60,6 +62,7 @@ import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -121,6 +124,9 @@ public class WorkflowExecutionContext {
String taskId;
transient Task<Object> task;
+ @JsonProperty("expiryKey")
+ String expiryKey;
+
/** all tasks created for this workflow */
Set<WorkflowReplayUtils.WorkflowReplayRecord> replays = MutableSet.of();
transient WorkflowReplayUtils.WorkflowReplayRecord replayCurrent = null;
@@ -239,7 +245,7 @@ public class WorkflowExecutionContext {
TaskBuilder<Object> tb = Tasks.builder().dynamic(true);
if (optionalTaskFlags!=null) tb.flags(optionalTaskFlags);
- else tb.displayName(name);
+ if (Strings.isBlank(tb.getDisplayName())) tb.displayName(name);
task = tb.body(new Body()).build();
WorkflowReplayUtils.updateOnWorkflowStartOrReplay(this, task, "initial run", false);
workflowId = taskId = task.getId();
@@ -564,6 +570,49 @@ public class WorkflowExecutionContext {
return errorHandlerContext;
}
+ @JsonIgnore
+ public String getExpiryKey() {
+ if (Strings.isNonBlank(expiryKey)) return expiryKey;
+ if (Strings.isNonBlank(getName())) return getName();
+ return "anonymous-workflow";
+ }
+
+ /** look in tasks, steps, and replays to find most recent activity */
+ public long getMostRecentActivityTime() {
+ AtomicLong result = new AtomicLong(-1);
+
+ Consumer<Long> consider = l -> {
+ if (l!=null && l>result.get()) result.set(l);
+ };
+ Consumer<Task> considerTask = task -> {
+ if (task!=null) {
+ consider.accept(task.getEndTimeUtc());
+ consider.accept(task.getStartTimeUtc());
+ consider.accept(task.getSubmitTimeUtc());
+ }
+ };
+ considerTask.accept(getTask(false).orNull());
+
+ Consumer<WorkflowReplayUtils.WorkflowReplayRecord> considerReplay = replay -> {
+ if (replay!=null) {
+ consider.accept(replay.endTimeUtc);
+ consider.accept(replay.startTimeUtc);
+ consider.accept(replay.submitTimeUtc);
+ }
+ };
+ if (replayCurrent!=null) {
+ considerReplay.accept(replayCurrent);
+ } else if (!replays.isEmpty()) {
+ considerReplay.accept(Iterables.getLast(replays));
+ }
+
+ if (currentStepInstance!=null) {
+ considerTask.accept(getManagementContext().getExecutionManager().getTask(currentStepInstance.getTaskId()));
+ }
+
+ return result.get();
+ }
+
public List<Object> getStepsDefinition() {
return MutableList.copyOf(stepsDefinition).asUnmodifiable();
}
@@ -771,12 +820,12 @@ public class WorkflowExecutionContext {
boolean errorHandled = false;
if (isTimeout) {
// don't run error handler
- log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', throwing: " + Exceptions.collapseText(e));
+ log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e));
} else if (onError != null && !onError.isEmpty() && provisionalStatus.persistable) {
WorkflowErrorHandling.WorkflowErrorHandlingResult result = null;
try {
- log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler");
+ log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", running error handler");
Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e);
errorHandlerTaskId = workflowErrorHandlerTask.getId();
result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked();
@@ -797,13 +846,13 @@ public class WorkflowExecutionContext {
}
} catch (Exception e2) {
- log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "' error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2));
+ log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2));
log.debug("Full trace of original error was: " + e, e);
e = e2;
}
} else {
- log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', no error handler so rethrowing: " + Exceptions.collapseText(e));
+ log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", no error handler so rethrowing: " + Exceptions.collapseText(e));
}
if (!errorHandled) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
index 9a2d5b2d05..2026721992 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
@@ -58,6 +58,9 @@ public abstract class WorkflowStepDefinition {
public String getName() {
return name;
}
+ public void setName(String name) {
+ this.name = name;
+ }
protected String userSuppliedShorthand;
protected String shorthandTypeName;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
index 62746d2806..444aba31b4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
@@ -53,6 +53,10 @@ public class RetryWorkflowStep extends WorkflowStepDefinition {
public static final ConfigKey<List<RetryLimit>> LIMIT = ConfigKeys.newConfigKey(new TypeToken<List<RetryLimit>>() {}, "limit");
public static final ConfigKey<RetryBackoff> BACKOFF = ConfigKeys.newConfigKey(RetryBackoff.class, "backoff");
+ // if multiple retry steps declare the same key, their counts will be combined; used if the same error might be handled in different ways
+ // note that the limits and backoff _instructions_ apply only at the step where they are defing, so they may need to be defined at each step
+ public static final ConfigKey<String> KEY = ConfigKeys.newStringConfigKey("key");
+
public enum RetryReplayOption { TRUE, FALSE, FORCE }
public static class RetryLimit {
@@ -188,7 +192,7 @@ public class RetryWorkflowStep extends WorkflowStepDefinition {
@Override
protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
- String key = context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current());
+ String key = Strings.firstNonBlank(context.getInput(KEY), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()));
List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(key, (k, v) -> v != null ? v : MutableList.of());
List<RetryLimit> limit = context.getInput(LIMIT);
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 6874ea357e..47e4390f17 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -26,17 +26,22 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.WorkflowErrorHandling;
import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
public class WorkflowStatePersistenceViaSensors {
+ private static final Logger log = LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class);
+
public static final AttributeSensor<Map<String,WorkflowExecutionContext>> INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow");
@@ -54,6 +59,24 @@ public class WorkflowStatePersistenceViaSensors {
entity.sensors().modify(INTERNAL_WORKFLOWS, v -> {
if (v == null) v = MutableMap.of();
v.put(context.getWorkflowId(), context);
+ String k = Strings.firstNonBlank(context.getExpiryKey(), "empty-expiry-key"); //should always be set
+ // TODO follow expiry instructions; for now, just keep 3 latest, apart from this one
+ List<WorkflowExecutionContext> finishedTwins = v.values().stream()
+ .filter(c -> k.equals(c.getExpiryKey()))
+ .filter(c -> c.getStatus()!=null && c.getStatus().ended)
+ .filter(c -> !c.equals(context))
+ .collect(Collectors.toList());
+ if (finishedTwins.size()>3) {
+ finishedTwins = MutableList.copyOf(finishedTwins);
+ Collections.sort(finishedTwins, (t1,t2) -> Long.compare(t2.getMostRecentActivityTime(), t1.getMostRecentActivityTime()));
+ Iterator<WorkflowExecutionContext> ti = finishedTwins.iterator();
+ for (int i=0; i<3; i++) ti.next();
+ while (ti.hasNext()) {
+ WorkflowExecutionContext w = ti.next();
+ log.debug("Expiring old workflow "+w+" because it is finished and there are newer ones");
+ v.remove(w.getWorkflowId());
+ }
+ }
return Maybe.of(v);
});
mgmt.getRebindManager().forcePersistNow(false, null);
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
index 79624125d0..069903f728 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
@@ -71,7 +71,10 @@ public class TaskBuilder<T> {
this.displayName = displayName;
return this;
}
-
+ public String getDisplayName() {
+ return displayName;
+ }
+
public TaskBuilder<T> description(String description) {
this.description = description;
return this;
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index 049a69235d..b68d8bbb83 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -573,8 +573,8 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl
m -> m.matches("Starting workflow 'Workflow for effector myWorkflow', moving to first step .*-1"),
m -> m.matches("Starting step .*-1 in task .*"),
m -> m.matches("Error in step '1 - invoke-effector does-not-exist', no error handler so rethrowing: No effector matching 'does-not-exist'"),
- m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1 'myWorkflow', running error handler"),
- m -> m.matches("Encountered error in workflow .*/.* 'myWorkflow' .handler present.: No effector matching 'does-not-exist'"),
+ m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1, running error handler"),
+ m -> m.matches("Encountered error in workflow .*/.* 'Workflow for effector myWorkflow' .handler present.: No effector matching 'does-not-exist'"),
m -> m.matches("Creating workflow .* error handler .*-error-handler in task .*"),
m -> m.matches("Starting .*-error-handler with 1 handler in task .*"),
m -> m.matches("Creating handler .*-error-handler-1 'no-op' in task .*"),