You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/10/21 13:58:02 UTC

[brooklyn-server] 10/11: misc workflow fixes - more details, including error and rest calls

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 4062c3ab7deb7a06718991e64a95f5e12b735b9f
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Fri Oct 21 11:42:04 2022 +0100

    misc workflow fixes - more details, including error and rest calls
---
 .../brooklyn/core/workflow/WorkflowExecutionContext.java  | 13 +++++++++++--
 .../brooklyn/core/workflow/WorkflowReplayUtils.java       | 14 +++++++++++++-
 .../workflow/WorkflowStepInstanceExecutionContext.java    |  3 +++
 .../brooklyn/core/workflow/steps/CustomWorkflowStep.java  | 14 ++++++++++++--
 .../brooklyn/core/workflow/steps/SshWorkflowStep.java     |  4 ++--
 .../org/apache/brooklyn/util/core/task/TaskBuilder.java   | 15 ++++++++++-----
 .../apache/brooklyn/rest/resources/EntityResource.java    |  5 ++++-
 .../software/base/WorkflowSoftwareProcessSshDriver.java   |  2 +-
 8 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 90e32abd10..76752c672f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -134,6 +134,10 @@ public class WorkflowExecutionContext {
     String previousStepTaskId;
 
     WorkflowStepInstanceExecutionContext currentStepInstance;
+
+    /** set if an error handler is the last thing which ran */
+    String errorHandlerTaskId;
+    /** set for the last _step_ inside the error handler */
     WorkflowStepInstanceExecutionContext errorHandlerContext;
 
     Map<Integer, OldStepRecord> oldStepInfo = MutableMap.of();
@@ -773,7 +777,9 @@ public class WorkflowExecutionContext {
                             WorkflowErrorHandling.WorkflowErrorHandlingResult result = null;
                             try {
                                 log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler");
-                                result = DynamicTasks.queue(WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e)).getUnchecked();
+                                Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e);
+                                errorHandlerTaskId = workflowErrorHandlerTask.getId();
+                                result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked();
                                 if (result != null) {
                                     errorHandled = true;
 
@@ -899,6 +905,7 @@ public class WorkflowExecutionContext {
             // and update replayable info
             WorkflowReplayUtils.updateOnWorkflowStepChange(currentStepRecord, currentStepInstance, step);
             errorHandlerContext = null;
+            errorHandlerTaskId = null;
 
             persist();
 
@@ -929,7 +936,9 @@ public class WorkflowExecutionContext {
                 if (!step.onError.isEmpty()) {
                     WorkflowErrorHandling.WorkflowErrorHandlingResult result;
                     try {
-                        result = DynamicTasks.queue(WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e)).getUnchecked();
+                        Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> stepErrorHandlerTask = WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e);
+                        currentStepInstance.errorHandlerTaskId = stepErrorHandlerTask.getId();
+                        result = DynamicTasks.queue(stepErrorHandlerTask).getUnchecked();
                         if (result!=null) {
                             if (Strings.isNonBlank(result.next)) customNext.set(result.next);
                             saveOutput.accept(result.output);
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
index 30112ea2c3..d1449cff01 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
@@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Supplier;
@@ -75,8 +77,18 @@ public class WorkflowReplayUtils {
                 log.warn("Mismatch in workflow replays for "+ctx+": "+ctx.replayCurrent +" vs "+task);
                 return;
             }
-            ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId();
+
+            // try hard to get submitter data in case tasks go awol before execution
+            if (task.getSubmittedByTaskId()!=null) {
+                ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId();
+            } else if (ctx.replayCurrent.submittedByTaskId==null && Tasks.current()!=null && !Tasks.current().equals(task)) {
+                ctx.replayCurrent.submittedByTaskId = Tasks.current().getId();
+            }
             ctx.replayCurrent.submitTimeUtc = task.getSubmitTimeUtc();
+            // fake this because we won't see the real value until we also see the start value.
+            // however we need to ensure any workflow that is created is intended to be run.
+            if (ctx.replayCurrent.submitTimeUtc<=0) ctx.replayCurrent.submitTimeUtc = Instant.now().toEpochMilli();
+
             ctx.replayCurrent.startTimeUtc = task.getStartTimeUtc();
             ctx.replayCurrent.endTimeUtc = task.getEndTimeUtc();
             ctx.replayCurrent.status = task.getStatusSummary();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index b3bc5065f5..d9ea83f774 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -61,6 +61,9 @@ public class WorkflowStepInstanceExecutionContext {
     /** set if the step is in an error handler context, containing the error being handled */
     Throwable error;
 
+    /** set if there was an error handled locally */
+    String errorHandlerTaskId;
+
     public String getTaskId() {
         return taskId;
     }
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index 4f36809a5e..c1e313f78c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -28,6 +28,7 @@ import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.ManagementContext;
 import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
+import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils;
 import org.apache.brooklyn.core.resolve.jackson.JsonPassThroughDeserializer;
@@ -168,14 +169,23 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl
         return result;
     }
 
+    /** Returns a top-level workflow running the workflow defined here */
     public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig) {
+        return newWorkflowExecution(entity, name, extraConfig, null);
+    }
+    public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig, Map extraTaskFlags) {
         return WorkflowExecutionContext.newInstancePersisted(entity, name,
                 ConfigBag.newInstance()
                         .configure(WorkflowCommonConfig.PARAMETER_DEFS, parameters)
                         .configure(WorkflowCommonConfig.STEPS, steps)
-                        .configure(WorkflowCommonConfig.OUTPUT, workflowOutput),
+                        .configure(WorkflowCommonConfig.INPUT, input)
+                        .configure(WorkflowCommonConfig.OUTPUT, workflowOutput)
+                        .configure(WorkflowCommonConfig.REPLAYABLE, replayable)
+                        .configure(WorkflowCommonConfig.ON_ERROR, onError)
+                        .configure(WorkflowCommonConfig.TIMEOUT, timeout)
+                        .configure((ConfigKey) WorkflowCommonConfig.CONDITION, condition),
                 null,
-                ConfigBag.newInstance(getInput()).putAll(extraConfig), null);
+                ConfigBag.newInstance(getInput()).putAll(extraConfig), extraTaskFlags);
     }
 
     @VisibleForTesting
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
index ba03791962..e16b4559b0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
@@ -87,7 +87,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition {
 
     protected static void checkExitCode(ProcessTaskWrapper<?> ptw, DslPredicates.DslPredicate<Integer> exitcode) {
         if (exitcode==null) {
-            if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'");
+            if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code "+ptw.getExitCode());
             return;
         }
 
@@ -103,7 +103,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition {
             // ranges still require `exit-code: { range: [0, 4] }`, same with `exit-code: { less-than: 5 }`.
         }
         if (!exitcode.apply(ptw.getExitCode())) {
-            throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'; does not match explicit exit-code requirement");
+            throw new IllegalStateException("Invalid exit code "+ptw.getExitCode()+"; does not match explicit exit-code requirement");
         }
     }
 
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
index c6177999bb..79624125d0 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
@@ -18,10 +18,7 @@
  */
 package org.apache.brooklyn.util.core.task;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.mgmt.Task;
@@ -34,10 +31,14 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Convenience for creating tasks; note that DynamicSequentialTask is the default */
 public class TaskBuilder<T> {
 
+    private static final Logger log = LoggerFactory.getLogger(TaskBuilder.class);
+
     String displayName = null;
     String description = null;
     Callable<T> body = null;
@@ -152,7 +153,11 @@ public class TaskBuilder<T> {
         MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
         if (displayName!=null) taskFlags.put("displayName", displayName);
         if (description!=null) taskFlags.put("description", description);
-        if (!tags.isEmpty()) taskFlags.put("tags", tags);
+        if (!tags.isEmpty()) {
+            Object otherTags = taskFlags.put("tags", tags);
+            if (otherTags instanceof Collection) tags.addAll((Collection)otherTags);
+            else log.warn("Ignoring unexpected 'tags' flag in task: "+otherTags);
+        }
         
         if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
             if (swallowChildrenFailures!=null)
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
index 84495ea37b..81cacb01fd 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
@@ -64,6 +64,7 @@ import org.apache.brooklyn.rest.transform.TaskTransformer;
 import org.apache.brooklyn.rest.util.EntityRelationUtils;
 import org.apache.brooklyn.rest.util.WebResourceUtils;
 import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.ClassLoaderUtils;
 import org.apache.brooklyn.util.core.ResourceUtils;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
@@ -425,7 +426,9 @@ public class EntityResource extends AbstractBrooklynRestResource implements Enti
         }
 
         WorkflowExecutionContext execution = workflow.newWorkflowExecution(target,
-                Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"), null);
+                Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"),
+                null,
+                MutableMap.of("tags", MutableMap.of("workflow_yaml", yaml)));
 
         Task<Object> task = Entities.submit(target, execution.getTask(true).get());
         task.blockUntilEnded(timeoutS==null ? Duration.millis(20) : Duration.of(timeoutS));
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
index c8bb664ea2..6a7108eb11 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
@@ -93,7 +93,7 @@ public class WorkflowSoftwareProcessSshDriver extends AbstractSoftwareProcessSsh
         WorkflowExecutionContext workflowContext = workflow.newWorkflowExecution(entity, key.getName().toLowerCase(),
                 null /* could getInput from workflow, and merge shell environment here */);
 
-        return Maybe.of(DynamicTasks.queue( workflowContext.getTask(true).get() ).getUnchecked());
+        return Maybe.of(DynamicTasks.queueIfPossible( workflowContext.getTask(true).get() ).orSubmitAsync(entity).getTask().getUnchecked());
     }
 
     @Override