You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/10/21 13:58:02 UTC
[brooklyn-server] 10/11: misc workflow fixes - more details, including error and rest calls
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 4062c3ab7deb7a06718991e64a95f5e12b735b9f
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Fri Oct 21 11:42:04 2022 +0100
misc workflow fixes - more details, including error and rest calls
---
.../brooklyn/core/workflow/WorkflowExecutionContext.java | 13 +++++++++++--
.../brooklyn/core/workflow/WorkflowReplayUtils.java | 14 +++++++++++++-
.../workflow/WorkflowStepInstanceExecutionContext.java | 3 +++
.../brooklyn/core/workflow/steps/CustomWorkflowStep.java | 14 ++++++++++++--
.../brooklyn/core/workflow/steps/SshWorkflowStep.java | 4 ++--
.../org/apache/brooklyn/util/core/task/TaskBuilder.java | 15 ++++++++++-----
.../apache/brooklyn/rest/resources/EntityResource.java | 5 ++++-
.../software/base/WorkflowSoftwareProcessSshDriver.java | 2 +-
8 files changed, 56 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
index 90e32abd10..76752c672f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExecutionContext.java
@@ -134,6 +134,10 @@ public class WorkflowExecutionContext {
String previousStepTaskId;
WorkflowStepInstanceExecutionContext currentStepInstance;
+
+ /** set if an error handler is the last thing which ran */
+ String errorHandlerTaskId;
+ /** set for the last _step_ inside the error handler */
WorkflowStepInstanceExecutionContext errorHandlerContext;
Map<Integer, OldStepRecord> oldStepInfo = MutableMap.of();
@@ -773,7 +777,9 @@ public class WorkflowExecutionContext {
WorkflowErrorHandling.WorkflowErrorHandlingResult result = null;
try {
log.debug("Error in workflow '" + getName() + "' around step " + workflowStepReference(currentStepIndex) + " '" + task.getDisplayName() + "', running error handler");
- result = DynamicTasks.queue(WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e)).getUnchecked();
+ Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> workflowErrorHandlerTask = WorkflowErrorHandling.createWorkflowErrorHandlerTask(WorkflowExecutionContext.this, task, e);
+ errorHandlerTaskId = workflowErrorHandlerTask.getId();
+ result = DynamicTasks.queue(workflowErrorHandlerTask).getUnchecked();
if (result != null) {
errorHandled = true;
@@ -899,6 +905,7 @@ public class WorkflowExecutionContext {
// and update replayable info
WorkflowReplayUtils.updateOnWorkflowStepChange(currentStepRecord, currentStepInstance, step);
errorHandlerContext = null;
+ errorHandlerTaskId = null;
persist();
@@ -929,7 +936,9 @@ public class WorkflowExecutionContext {
if (!step.onError.isEmpty()) {
WorkflowErrorHandling.WorkflowErrorHandlingResult result;
try {
- result = DynamicTasks.queue(WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e)).getUnchecked();
+ Task<WorkflowErrorHandling.WorkflowErrorHandlingResult> stepErrorHandlerTask = WorkflowErrorHandling.createStepErrorHandlerTask(step, currentStepInstance, t, e);
+ currentStepInstance.errorHandlerTaskId = stepErrorHandlerTask.getId();
+ result = DynamicTasks.queue(stepErrorHandlerTask).getUnchecked();
if (result!=null) {
if (Strings.isNonBlank(result.next)) customNext.set(result.next);
saveOutput.accept(result.output);
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
index 30112ea2c3..d1449cff01 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowReplayUtils.java
@@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
@@ -75,8 +77,18 @@ public class WorkflowReplayUtils {
log.warn("Mismatch in workflow replays for "+ctx+": "+ctx.replayCurrent +" vs "+task);
return;
}
- ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId();
+
+ // try hard to get submitter data in case tasks go awol before execution
+ if (task.getSubmittedByTaskId()!=null) {
+ ctx.replayCurrent.submittedByTaskId = task.getSubmittedByTaskId();
+ } else if (ctx.replayCurrent.submittedByTaskId==null && Tasks.current()!=null && !Tasks.current().equals(task)) {
+ ctx.replayCurrent.submittedByTaskId = Tasks.current().getId();
+ }
ctx.replayCurrent.submitTimeUtc = task.getSubmitTimeUtc();
+ // fake this because we won't see the real value until we also see the start value.
+ // however we need to ensure any workflow that is created is intended to be run.
+ if (ctx.replayCurrent.submitTimeUtc<=0) ctx.replayCurrent.submitTimeUtc = Instant.now().toEpochMilli();
+
ctx.replayCurrent.startTimeUtc = task.getStartTimeUtc();
ctx.replayCurrent.endTimeUtc = task.getEndTimeUtc();
ctx.replayCurrent.status = task.getStatusSummary();
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
index b3bc5065f5..d9ea83f774 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepInstanceExecutionContext.java
@@ -61,6 +61,9 @@ public class WorkflowStepInstanceExecutionContext {
/** set if the step is in an error handler context, containing the error being handled */
Throwable error;
+ /** set if there was an error handled locally */
+ String errorHandlerTaskId;
+
public String getTaskId() {
return taskId;
}
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
index 4f36809a5e..c1e313f78c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/CustomWorkflowStep.java
@@ -28,6 +28,7 @@ import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
+import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.resolve.jackson.BeanWithTypeUtils;
import org.apache.brooklyn.core.resolve.jackson.JsonPassThroughDeserializer;
@@ -168,14 +169,23 @@ public class CustomWorkflowStep extends WorkflowStepDefinition implements Workfl
return result;
}
+ /** Returns a top-level workflow running the workflow defined here */
public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig) {
+ return newWorkflowExecution(entity, name, extraConfig, null);
+ }
+ public WorkflowExecutionContext newWorkflowExecution(Entity entity, String name, ConfigBag extraConfig, Map extraTaskFlags) {
return WorkflowExecutionContext.newInstancePersisted(entity, name,
ConfigBag.newInstance()
.configure(WorkflowCommonConfig.PARAMETER_DEFS, parameters)
.configure(WorkflowCommonConfig.STEPS, steps)
- .configure(WorkflowCommonConfig.OUTPUT, workflowOutput),
+ .configure(WorkflowCommonConfig.INPUT, input)
+ .configure(WorkflowCommonConfig.OUTPUT, workflowOutput)
+ .configure(WorkflowCommonConfig.REPLAYABLE, replayable)
+ .configure(WorkflowCommonConfig.ON_ERROR, onError)
+ .configure(WorkflowCommonConfig.TIMEOUT, timeout)
+ .configure((ConfigKey) WorkflowCommonConfig.CONDITION, condition),
null,
- ConfigBag.newInstance(getInput()).putAll(extraConfig), null);
+ ConfigBag.newInstance(getInput()).putAll(extraConfig), extraTaskFlags);
}
@VisibleForTesting
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
index ba03791962..e16b4559b0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/steps/SshWorkflowStep.java
@@ -87,7 +87,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition {
protected static void checkExitCode(ProcessTaskWrapper<?> ptw, DslPredicates.DslPredicate<Integer> exitcode) {
if (exitcode==null) {
- if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'");
+ if (ptw.getExitCode()!=0) throw new IllegalStateException("Invalid exit code "+ptw.getExitCode());
return;
}
@@ -103,7 +103,7 @@ public class SshWorkflowStep extends WorkflowStepDefinition {
// ranges still require `exit-code: { range: [0, 4] }`, same with `exit-code: { less-than: 5 }`.
}
if (!exitcode.apply(ptw.getExitCode())) {
- throw new IllegalStateException("Invalid exit code '"+ptw.getExitCode()+"'; does not match explicit exit-code requirement");
+ throw new IllegalStateException("Invalid exit code "+ptw.getExitCode()+"; does not match explicit exit-code requirement");
}
}
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
index c6177999bb..79624125d0 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskBuilder.java
@@ -18,10 +18,7 @@
*/
package org.apache.brooklyn.util.core.task;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.Callable;
import org.apache.brooklyn.api.mgmt.Task;
@@ -34,10 +31,14 @@ import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Convenience for creating tasks; note that DynamicSequentialTask is the default */
public class TaskBuilder<T> {
+ private static final Logger log = LoggerFactory.getLogger(TaskBuilder.class);
+
String displayName = null;
String description = null;
Callable<T> body = null;
@@ -152,7 +153,11 @@ public class TaskBuilder<T> {
MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags);
if (displayName!=null) taskFlags.put("displayName", displayName);
if (description!=null) taskFlags.put("description", description);
- if (!tags.isEmpty()) taskFlags.put("tags", tags);
+ if (!tags.isEmpty()) {
+ Object otherTags = taskFlags.put("tags", tags);
+ if (otherTags instanceof Collection) tags.addAll((Collection)otherTags);
+ else log.warn("Ignoring unexpected 'tags' flag in task: "+otherTags);
+ }
if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) {
if (swallowChildrenFailures!=null)
diff --git a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
index 84495ea37b..81cacb01fd 100644
--- a/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
+++ b/rest/rest-resources/src/main/java/org/apache/brooklyn/rest/resources/EntityResource.java
@@ -64,6 +64,7 @@ import org.apache.brooklyn.rest.transform.TaskTransformer;
import org.apache.brooklyn.rest.util.EntityRelationUtils;
import org.apache.brooklyn.rest.util.WebResourceUtils;
import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.ClassLoaderUtils;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
@@ -425,7 +426,9 @@ public class EntityResource extends AbstractBrooklynRestResource implements Enti
}
WorkflowExecutionContext execution = workflow.newWorkflowExecution(target,
- Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"), null);
+ Strings.firstNonBlank(workflow.getName(), workflow.getId(), "API workflow invocation"),
+ null,
+ MutableMap.of("tags", MutableMap.of("workflow_yaml", yaml)));
Task<Object> task = Entities.submit(target, execution.getTask(true).get());
task.blockUntilEnded(timeoutS==null ? Duration.millis(20) : Duration.of(timeoutS));
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
index c8bb664ea2..6a7108eb11 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/WorkflowSoftwareProcessSshDriver.java
@@ -93,7 +93,7 @@ public class WorkflowSoftwareProcessSshDriver extends AbstractSoftwareProcessSsh
WorkflowExecutionContext workflowContext = workflow.newWorkflowExecution(entity, key.getName().toLowerCase(),
null /* could getInput from workflow, and merge shell environment here */);
- return Maybe.of(DynamicTasks.queue( workflowContext.getTask(true).get() ).getUnchecked());
+ return Maybe.of(DynamicTasks.queueIfPossible( workflowContext.getTask(true).get() ).orSubmitAsync(entity).getTask().getUnchecked());
}
@Override