You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/10/21 13:58:03 UTC

[brooklyn-server] 11/11: workflow basic expiry, and more workflow fixes

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit b35bab6dcd8f74ea727f9a1ea0fe9da90a260802
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Fri Oct 21 14:55:06 2022 +0100

    workflow basic expiry, and more workflow fixes
---
 .../core/workflow/WorkflowErrorHandling.java       |  2 +
 .../core/workflow/WorkflowExecutionContext.java    | 59 ++++++++++++++++++++--
 .../core/workflow/WorkflowStepDefinition.java      |  3 ++
 .../core/workflow/steps/RetryWorkflowStep.java     |  6 ++-
 .../store/WorkflowStatePersistenceViaSensors.java  | 29 +++++++++--
 .../brooklyn/util/core/task/TaskBuilder.java       |  5 +-
 .../workflow/WorkflowPersistReplayErrorsTest.java  |  4 +-
 7 files changed, 96 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
index f17d25792d..532ab13ee4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowErrorHandling.java
@@ -52,6 +52,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor
         log.debug("Encountered error in step "+context.getWorkflowStepReference()+" '" + stepTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error));
         Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowStepReference()+"-error-handler")
                 .tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context, null, context.getTaskId()))
+                .tag(BrooklynTaskTags.WORKFLOW_TAG)
                 .body(new WorkflowErrorHandling(step.getOnError(), context.getWorkflowExectionContext(), context.getWorkflowExectionContext().currentStepIndex, stepTask, error))
                 .build();
         TaskTags.addTagDynamically(stepTask, BrooklynTaskTags.tagForErrorHandledBy(task));
@@ -64,6 +65,7 @@ public class WorkflowErrorHandling implements Callable<WorkflowErrorHandling.Wor
         log.debug("Encountered error in workflow "+context.getWorkflowId()+"/"+context.getTaskId()+" '" + workflowTask.getDisplayName() + "' (handler present): " + Exceptions.collapseText(error));
         Task<WorkflowErrorHandlingResult> task = Tasks.<WorkflowErrorHandlingResult>builder().dynamic(true).displayName(context.getWorkflowId()+"-error-handler")
                 .tag(BrooklynTaskTags.tagForWorkflowStepErrorHandler(context))
+                .tag(BrooklynTaskTags.WORKFLOW_TAG)
                 .body(new WorkflowErrorHandling(context.onError, context, null, workflowTask, error))
                 .build();
         TaskTags.addTagDynamically(workflowTask, BrooklynTaskTags.tagForErrorHandledBy(task));
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 76752c672f..2d2eb1688e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -20,7 +20,9 @@ package org.apache.brooklyn.core.workflow;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.Iterables;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
@@ -60,6 +62,7 @@ import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -121,6 +124,9 @@ public class WorkflowExecutionContext {
     String taskId;
     transient Task<Object> task;
 
+    @JsonProperty("expiryKey")
+    String expiryKey;
+
     /** all tasks created for this workflow */
     Set<WorkflowReplayUtils.WorkflowReplayRecord> replays = MutableSet.of();
     transient WorkflowReplayUtils.WorkflowReplayRecord replayCurrent = null;
@@ -239,7 +245,7 @@ public class WorkflowExecutionContext {
 
         TaskBuilder<Object> tb = Tasks.builder().dynamic(true);
         if (optionalTaskFlags!=null) tb.flags(optionalTaskFlags);
-        else tb.displayName(name);
+        if (Strings.isBlank(tb.getDisplayName())) tb.displayName(name);
         task = tb.body(new Body()).build();
         WorkflowReplayUtils.updateOnWorkflowStartOrReplay(this, task, "initial run", false);
         workflowId = taskId = task.getId();
@@ -564,6 +570,49 @@ public class WorkflowExecutionContext {
         return errorHandlerContext;
     }
 
+    @JsonIgnore
+    public String getExpiryKey() {
+        if (Strings.isNonBlank(expiryKey)) return expiryKey;
+        if (Strings.isNonBlank(getName())) return getName();
+        return "anonymous-workflow";
+    }
+
+    /** look in tasks, steps, and replays to find most recent activity */
+    public long getMostRecentActivityTime() {
+        AtomicLong result = new AtomicLong(-1);
+
+        Consumer<Long> consider = l -> {
+            if (l!=null && l>result.get()) result.set(l);
+        };
+        Consumer<Task> considerTask = task -> {
+            if (task!=null) {
+                consider.accept(task.getEndTimeUtc());
+                consider.accept(task.getStartTimeUtc());
+                consider.accept(task.getSubmitTimeUtc());
+            }
+        };
+        considerTask.accept(getTask(false).orNull());
+
+        Consumer<WorkflowReplayUtils.WorkflowReplayRecord> considerReplay = replay -> {
+            if (replay!=null) {
+                consider.accept(replay.endTimeUtc);
+                consider.accept(replay.startTimeUtc);
+                consider.accept(replay.submitTimeUtc);
+            }
+        };
+        if (replayCurrent!=null) {
+            considerReplay.accept(replayCurrent);
+        } else if (!replays.isEmpty()) {
+            considerReplay.accept(Iterables.getLast(replays));
+        }
+
+        if (currentStepInstance!=null) {
+            considerTask.accept(getManagementContext().getExecutionManager().getTask(currentStepInstance.getTaskId()));
+        }
+
+        return result.get();
+    }
+
     public List<Object> getStepsDefinition() {
         return MutableList.copyOf(stepsDefinition).asUnmodifiable();
     }
@@ -771,12 +820,12 @@ public class WorkflowExecutionContext {
                         boolean errorHandled = false;
                         if (isTimeout) {
                             // don't run error handler
-                            log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', throwing: " + Exceptions.collapseText(e));
+                            log.debug("Timeout in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", throwing: " + Exceptions.collapseText(e));
 
                         } else if (onError != null && !onError.isEmpty() && provisionalStatus.persistable) {
                             WorkflowErrorHandling.WorkflowErrorHandlingResult result = null;
                             try {
-                                log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler");
+                                log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", running error handler");
                                 Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e);
                                 errorHandlerTaskId = workflowErrorHandlerTask.getId();
                                 result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked();
@@ -797,13 +846,13 @@ public class WorkflowExecutionContext {
                                 }
 
                             } catch (Exception e2) {
-                                log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "' error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2));
+                                log.warn("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " error handler for -- " + Exceptions.collapseText(e) + " -- threw another error (rethrowing): " + Exceptions.collapseText(e2));
                                 log.debug("Full trace of original error was: " + e, e);
                                 e = e2;
                             }
 
                         } else {
-                            log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', no error handler so rethrowing: " + Exceptions.collapseText(e));
+                            log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + ", no error handler so rethrowing: " + Exceptions.collapseText(e));
                         }
 
                         if (!errorHandled) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
index 9a2d5b2d05..2026721992 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepDefinition.java
@@ -58,6 +58,9 @@ public abstract class WorkflowStepDefinition {
     public String getName() {
         return name;
     }
+    public void setName(String name) {
+        this.name = name;
+    }
 
     protected String userSuppliedShorthand;
     protected String shorthandTypeName;
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
index 62746d2806..444aba31b4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/RetryWorkflowStep.java
@@ -53,6 +53,10 @@ public class RetryWorkflowStep extends WorkflowStepDefinition {
     public static final ConfigKey<List<RetryLimit>> LIMIT = ConfigKeys.newConfigKey(new TypeToken<List<RetryLimit>>() {}, "limit");
     public static final ConfigKey<RetryBackoff> BACKOFF = ConfigKeys.newConfigKey(RetryBackoff.class, "backoff");
 
+    // if multiple retry steps declare the same key, their counts will be combined; used if the same error might be handled in different ways
+    // note that the limits and backoff _instructions_ apply only at the step where they are defing, so they may need to be defined at each step
+    public static final ConfigKey<String> KEY = ConfigKeys.newStringConfigKey("key");
+
     public enum RetryReplayOption { TRUE, FALSE, FORCE }
 
     public static class RetryLimit {
@@ -188,7 +192,7 @@ public class RetryWorkflowStep extends WorkflowStepDefinition {
     @Override
     protected Object doTaskBody(WorkflowStepInstanceExecutionContext context) {
 
-        String key = context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current());
+        String key = Strings.firstNonBlank(context.getInput(KEY), context.getWorkflowExectionContext().getWorkflowStepReference(Tasks.current()));
         List<Instant> retries = context.getWorkflowExectionContext().getRetryRecords().compute(key, (k, v) -> v != null ? v : MutableList.of());
 
         List<RetryLimit> limit = context.getInput(LIMIT);
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
index 6874ea357e..47e4390f17 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/store/WorkflowStatePersistenceViaSensors.java
@@ -26,17 +26,22 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.WorkflowErrorHandling;
 import org.apache.brooklyn.core.workflow.WorkflowExecutionContext;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
 
 public class WorkflowStatePersistenceViaSensors {
 
+    private static final Logger log = LoggerFactory.getLogger(WorkflowStatePersistenceViaSensors.class);
+
     public static final AttributeSensor<Map<String,WorkflowExecutionContext>> INTERNAL_WORKFLOWS = Sensors.newSensor(new TypeToken<Map<String, WorkflowExecutionContext>>() {}, "internals.brooklyn.workflow");
 
 
@@ -54,6 +59,24 @@ public class WorkflowStatePersistenceViaSensors {
             entity.sensors().modify(INTERNAL_WORKFLOWS, v -> {
                 if (v == null) v = MutableMap.of();
                 v.put(context.getWorkflowId(), context);
+                String k = Strings.firstNonBlank(context.getExpiryKey(), "empty-expiry-key");  //should always be set
+                // TODO follow expiry instructions; for now, just keep 3 latest, apart from this one
+                List<WorkflowExecutionContext> finishedTwins = v.values().stream()
+                        .filter(c -> k.equals(c.getExpiryKey()))
+                        .filter(c -> c.getStatus()!=null && c.getStatus().ended)
+                        .filter(c -> !c.equals(context))
+                        .collect(Collectors.toList());
+                if (finishedTwins.size()>3) {
+                    finishedTwins = MutableList.copyOf(finishedTwins);
+                    Collections.sort(finishedTwins, (t1,t2) -> Long.compare(t2.getMostRecentActivityTime(), t1.getMostRecentActivityTime()));
+                    Iterator<WorkflowExecutionContext> ti = finishedTwins.iterator();
+                    for (int i=0; i<3; i++) ti.next();
+                    while (ti.hasNext()) {
+                        WorkflowExecutionContext w = ti.next();
+                        log.debug("Expiring old workflow "+w+" because it is finished and there are newer ones");
+                        v.remove(w.getWorkflowId());
+                    }
+                }
                 return Maybe.of(v);
             });
             mgmt.getRebindManager().forcePersistNow(false, null);
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
index 79624125d0..069903f728 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
@@ -71,7 +71,10 @@ public class TaskBuilder<T> {
         this.displayName = displayName;
         return this;
     }
-    
+    public String getDisplayName() {
+        return displayName;
+    }
+
     public TaskBuilder<T> description(String description) {
         this.description = description;
         return this;
diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
index 049a69235d..b68d8bbb83 100644
--- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowPersistReplayErrorsTest.java
@@ -573,8 +573,8 @@ public class WorkflowPersistReplayErrorsTest extends RebindTestFixture<BasicAppl
                     m -> m.matches("Starting workflow 'Workflow for effector myWorkflow', moving to first step .*-1"),
                     m -> m.matches("Starting step .*-1 in task .*"),
                     m -> m.matches("Error in step '1 - invoke-effector does-not-exist', no error handler so rethrowing: No effector matching 'does-not-exist'"),
-                    m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1 'myWorkflow', running error handler"),
-                    m -> m.matches("Encountered error in workflow .*/.* 'myWorkflow' .handler present.: No effector matching 'does-not-exist'"),
+                    m -> m.matches("Error in workflow 'Workflow for effector myWorkflow' around step .*-1, running error handler"),
+                    m -> m.matches("Encountered error in workflow .*/.* 'Workflow for effector myWorkflow' .handler present.: No effector matching 'does-not-exist'"),
                     m -> m.matches("Creating workflow .* error handler .*-error-handler in task .*"),
                     m -> m.matches("Starting .*-error-handler with 1 handler in task .*"),
                     m -> m.matches("Creating handler .*-error-handler-1 'no-op' in task .*"),