You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2024/04/01 15:38:17 UTC
(brooklyn-server) 01/06: record the workflow resolution context better
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 931caf0bf03e306d7cd15e296a7ffdcb6ef04b5a
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Fri Mar 29 16:27:06 2024 +0000
record the workflow resolution context better
allows better tracing, and the ability to use that context when coercing
---
.../workflow/WorkflowExpressionResolution.java | 247 ++++++++++++---------
.../brooklyn/util/core/text/TemplateProcessor.java | 6 +-
.../util/collections/ThreadLocalStack.java | 23 +-
3 files changed, 163 insertions(+), 113 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index c8e32abad5..658aeb1b4d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -20,7 +20,6 @@ package org.apache.brooklyn.core.workflow;
import java.time.Instant;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -29,6 +28,9 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.annotation.Nullable;
import com.google.common.annotations.Beta;
import com.google.common.reflect.TypeToken;
@@ -188,7 +190,7 @@ public class WorkflowExpressionResolution {
}
} catch (Throwable t) {
Exceptions.propagateIfFatal(t);
- if (stage==WorkflowExpressionStage.STEP_INPUT && WorkflowVariableResolutionStackEntry.isStackForSettingVariable(RESOLVE_STACK.getAll(true), key) && Exceptions.getFirstThrowableOfType(t, WorkflowVariableRecursiveReference.class)!=null) {
+ if (stage==WorkflowExpressionStage.STEP_INPUT && isSettingVariable(key) && Exceptions.getFirstThrowableOfType(t, WorkflowVariableRecursiveReference.class)!=null) {
// input evaluation can look at local input, and will gracefully handle some recursive references.
// this is needed so we can handle things like env:=${env} in input, and also {message:="Hi ${name}", name:="Bob"}.
@@ -401,94 +403,119 @@ public class WorkflowExpressionResolution {
/** does not use templates */
public <T> T resolveCoercingOnly(Object expression, TypeToken<T> type) {
if (expression==null) return null;
- boolean triedCoercion = false;
- List<Exception> exceptions = MutableList.of();
- if (expression instanceof String) {
- try {
- // prefer simple coercion if it's a string coming in
- return TypeCoercions.coerce(expression, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptions.add(e);
- triedCoercion = true;
+ return inResolveStackEntry("resolve-coercing", expression, () -> {
+ boolean triedCoercion = false;
+ List<Exception> exceptions = MutableList.of();
+ if (expression instanceof String) {
+ try {
+ // prefer simple coercion if it's a string coming in
+ return TypeCoercions.coerce(expression, type);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptions.add(e);
+ triedCoercion = true;
+ }
}
- }
- if (Jsonya.isJsonPrimitiveDeep(expression) && !(expression instanceof Set)) {
- try {
- // next try yaml coercion for anything complex, as values are normally set from yaml and will be raw at this stage (but not if they are from a DSL)
- return BeanWithTypeUtils.convert(context.getManagementContext(), expression, type, true,
- RegisteredTypes.getClassLoadingContext(context.getEntity()), true /* needed for wrapped resolved holders */);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptions.add(e);
+ if (Jsonya.isJsonPrimitiveDeep(expression) && !(expression instanceof Set)) {
+ try {
+ // next try yaml coercion for anything complex, as values are normally set from yaml and will be raw at this stage (but not if they are from a DSL)
+ return BeanWithTypeUtils.convert(context.getManagementContext(), expression, type, true,
+ RegisteredTypes.getClassLoadingContext(context.getEntity()), true /* needed for wrapped resolved holders */);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptions.add(e);
+ }
}
- }
- if (!triedCoercion) {
- try {
- // fallback to simple coercion
- return TypeCoercions.coerce(expression, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptions.add(e);
- triedCoercion = true;
+ if (!triedCoercion) {
+ try {
+ // fallback to simple coercion
+ return TypeCoercions.coerce(expression, type);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptions.add(e);
+ triedCoercion = true;
+ }
}
- }
- throw Exceptions.propagate(exceptions.iterator().next());
+ throw Exceptions.propagate(exceptions.iterator().next());
+ });
}
- static class WorkflowVariableResolutionStackEntry {
+ public static class WorkflowResolutionStackEntry {
+ // resolver is null if caller has indicated evaluation before resolution
+ @Nullable WorkflowExpressionResolution resolver;
WorkflowExecutionContext context;
WorkflowExpressionStage stage;
- Object object;
+ String callPointUid;
+ Object expression;
String settingVariable;
- public static WorkflowVariableResolutionStackEntry of(WorkflowExecutionContext context, WorkflowExpressionStage stage, Object expression) {
- WorkflowVariableResolutionStackEntry result = new WorkflowVariableResolutionStackEntry();
+ public static WorkflowResolutionStackEntry of(WorkflowExpressionResolution resolver, String callPointUid, Object expression) {
+ WorkflowResolutionStackEntry result = of(resolver == null ? null : resolver.context, resolver.stage, callPointUid, expression);
+ result.resolver = resolver;
+ return result;
+ }
+ public static WorkflowResolutionStackEntry of(WorkflowExecutionContext context, WorkflowExpressionStage stage, String callPointUid, Object expression) {
+ WorkflowResolutionStackEntry result = new WorkflowResolutionStackEntry();
result.context = context;
+ result.callPointUid = callPointUid;
result.stage = stage;
- result.object = expression;
+ result.expression = expression;
return result;
}
- public static WorkflowVariableResolutionStackEntry setting(WorkflowExecutionContext context, WorkflowExpressionStage stage, String settingVariable) {
- WorkflowVariableResolutionStackEntry result = new WorkflowVariableResolutionStackEntry();
- result.context = context;
- result.stage = stage;
+ public static WorkflowResolutionStackEntry settingVariable(WorkflowExecutionContext context, WorkflowExpressionStage stage, String settingVariable) {
+ WorkflowResolutionStackEntry result = of(context, stage, "setting-variable", null);
result.settingVariable = settingVariable;
return result;
}
- public static boolean isStackForSettingVariable(Collection<WorkflowVariableResolutionStackEntry> stack, String key) {
+ public static boolean isStackForSettingVariable(Stream<WorkflowResolutionStackEntry> stack, String key) {
if (stack==null) return true;
- MutableList<WorkflowVariableResolutionStackEntry> s2 = MutableList.copyOf(stack);
- Collections.reverse(s2);
- Optional<WorkflowVariableResolutionStackEntry> s = s2.stream().filter(si -> si.settingVariable != null).findFirst();
+ Optional<WorkflowResolutionStackEntry> s = stack.filter(si -> si.settingVariable != null).findFirst();
if (!s.isPresent()) return false;
return s.get().settingVariable.equals(key);
}
+ public String getWorkflowId() {
+ WorkflowExecutionContext ctx = getWorkflowExecutionContext();
+ return ctx == null ? null : ctx.getWorkflowId();
+ }
+ public WorkflowExpressionResolution getWorkflowExpressionResolution() {
+ return resolver;
+ }
+ public WorkflowExecutionContext getWorkflowExecutionContext() {
+ return context;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- WorkflowVariableResolutionStackEntry that = (WorkflowVariableResolutionStackEntry) o;
+ WorkflowResolutionStackEntry that = (WorkflowResolutionStackEntry) o;
- if (context != null && that.context != null ? !Objects.equals(context.getWorkflowId(), that.context.getWorkflowId()) : !Objects.equals(context, that.context)) return false;
+ String wid = getWorkflowId();
+ String tid = that.getWorkflowId();
+ // might have different contexts with same ID; but if ID not set for some reason then use context
+ boolean checkIdNotContext = wid!=null && tid!=null;
+ if (checkIdNotContext && tid!=null && !Objects.equals(wid, tid)) return false;
+ if (!checkIdNotContext && !Objects.equals(getWorkflowExecutionContext(), that.getWorkflowExecutionContext())) return false;
if (stage != that.stage) return false;
- if (object != null ? !object.equals(that.object) : that.object != null) return false;
+ if (!Objects.equals(callPointUid, that.callPointUid)) return false;
+ if (expression != null ? !expression.equals(that.expression) : that.expression != null) return false;
if (settingVariable != null ? !settingVariable.equals(that.settingVariable) : that.settingVariable != null) return false;
return true;
}
@Override
public int hashCode() {
- int result = context != null && context.getWorkflowId()!=null ? context.getWorkflowId().hashCode() : 0;
+ int result = getWorkflowId() != null ? getWorkflowId().hashCode() : 0;
result = 31 * result + (stage != null ? stage.hashCode() : 0);
- result = 31 * result + (object != null ? object.hashCode() : 0);
+ result = 31 * result + (callPointUid != null ? callPointUid.hashCode() : 0);
+ result = 31 * result + (expression != null ? expression.hashCode() : 0);
result = 31 * result + (settingVariable != null ? settingVariable.hashCode() : 0);
return result;
}
@@ -497,27 +524,39 @@ public class WorkflowExpressionResolution {
/** method which can be used to indicate that a reference to the variable, if it is recursive, is recoverable, because we are in the process of setting that variable.
* see discussion on usages of WorkflowVariableResolutionStackEntry.isStackForSettingVariable */
public static <T> T allowingRecursionWhenSetting(WorkflowExecutionContext context, WorkflowExpressionStage stage, String variable, Supplier<T> callable) {
- WorkflowVariableResolutionStackEntry entry = null;
- try {
- entry = WorkflowVariableResolutionStackEntry.setting(context, stage, variable);
- if (!RESOLVE_STACK.push(entry)) {
- entry = null;
- throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.getAll(false).stream().map(p -> p.object!=null ? p.object.toString() : p.settingVariable).collect(Collectors.joining("->")));
- }
+ return inResolveStackEntry(WorkflowResolutionStackEntry.settingVariable(context, stage, variable), () -> {
+ throw new WorkflowVariableRecursiveReference("Recursive or missing reference setting "+variable+": "+RESOLVE_STACK.stream().map(p -> p.expression !=null ? p.expression.toString() : p.settingVariable).filter(x -> x!=null).collect(Collectors.joining("->")));
+ },
+ callable);
+ }
- return callable.get();
+ static ThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new ThreadLocalStack<>(false);
+ <T> T inResolveStackEntry(String callPointUid, Object expression, Supplier<T> code) {
+ return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, callPointUid, expression), null, code);
+ }
+ static <T> T inResolveStackEntry(WorkflowResolutionStackEntry entry, Runnable errorIfDuplicate, Supplier<T> code) {
+ boolean added = RESOLVE_STACK.push(entry);
+ if (!added && errorIfDuplicate!=null) errorIfDuplicate.run();
+ try {
+ return code.get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
} finally {
- if (entry!=null) {
- RESOLVE_STACK.pop(entry);
- }
+ if (added) RESOLVE_STACK.pop(entry);
}
}
- static ThreadLocalStack<WorkflowVariableResolutionStackEntry> RESOLVE_STACK = new ThreadLocalStack<>(false);
-
WorkflowExpressionStage previousStage() {
- return RESOLVE_STACK.peekPenultimate().map(s -> s.stage).orNull();
+ return RESOLVE_STACK.stream().skip(1).map(s -> s.stage).filter(s -> s!=null).findFirst().orElse(null);
+ }
+
+ public static boolean isSettingVariable(String key) {
+ return WorkflowResolutionStackEntry.isStackForSettingVariable(RESOLVE_STACK.stream(), key);
+ }
+
+ public static WorkflowExpressionResolution getCurrentWorkflowExpressionResolution() {
+ return RESOLVE_STACK.stream().map(WorkflowResolutionStackEntry::getWorkflowExpressionResolution).filter(x -> x!=null).findFirst().orElse(null);
}
public static class WorkflowVariableRecursiveReference extends IllegalArgumentException {
@@ -547,50 +586,48 @@ public class WorkflowExpressionResolution {
}
public Object processTemplateExpression(Object expression, AllowBrooklynDslMode allowBrooklynDsl) {
- WorkflowVariableResolutionStackEntry entry = null;
- try {
- entry = WorkflowVariableResolutionStackEntry.of(context, stage, expression);
- if (!RESOLVE_STACK.push(entry)) {
- entry = null;
- throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.object).collect(Collectors.joining("->")));
- }
- if (RESOLVE_STACK.size() > 100) {
- throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.object).collect(Collectors.joining("->")));
- }
+ return inResolveStackEntry(WorkflowResolutionStackEntry.of(this, "process-template-expression", expression), () -> {
+ throw new WorkflowVariableRecursiveReference("Recursive reference: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->")));
+ }, () -> {
+ try {
+ if (RESOLVE_STACK.size() > 100) {
+ throw new WorkflowVariableRecursiveReference("Reference exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" + p.expression).collect(Collectors.joining("->")));
+ }
- Object result;
- if (expression instanceof String) result = processTemplateExpressionString((String) expression, allowBrooklynDsl);
- else if (expression instanceof Map) result = processTemplateExpressionMap((Map) expression, allowBrooklynDsl);
- else if (expression instanceof Collection)
- result = processTemplateExpressionCollection((Collection) expression, allowBrooklynDsl);
- else if (expression == null || Boxing.isPrimitiveOrBoxedObject(expression)) result = expression;
- else {
- // otherwise resolve DSL
- result = allowBrooklynDsl.isAllowedHere() ? resolveDsl(expression) : expression;
- if (wrappingMode.wrapResolvedValues && !Objects.equals(result, expression) && !(result instanceof DeferredSupplier)) {
- result = WrappedResolvedExpression.ifNonDeferred(expression, result);
+ Object result;
+ if (expression instanceof String)
+ result = processTemplateExpressionString((String) expression, allowBrooklynDsl);
+ else if (expression instanceof Map)
+ result = processTemplateExpressionMap((Map) expression, allowBrooklynDsl);
+ else if (expression instanceof Collection)
+ result = processTemplateExpressionCollection((Collection) expression, allowBrooklynDsl);
+ else if (expression == null || Boxing.isPrimitiveOrBoxedObject(expression)) result = expression;
+ else {
+ // otherwise resolve DSL
+ result = allowBrooklynDsl.isAllowedHere() ? resolveDsl(expression) : expression;
+ if (wrappingMode.wrapResolvedValues && !Objects.equals(result, expression) && !(result instanceof DeferredSupplier)) {
+ result = WrappedResolvedExpression.ifNonDeferred(expression, result);
+ }
}
- }
- return result;
+ return result;
- } catch (Exception e) {
- Exception e2 = e;
- if (wrappingMode.deferAndRetryErroneousExpressions) {
- return WrappedUnresolvedExpression.ofExpression(expression, this, allowBrooklynDsl);
- }
- if (!allowWaiting && Exceptions.isCausedByInterruptInAnyThread(e)) {
- e2 = new IllegalArgumentException("Expression value '"+expression+"' unavailable and not permitted to wait: "+ Exceptions.collapseText(e), e);
- }
- if (wrappingMode.deferThrowingError) {
- // in wrapped value mode, errors don't throw until accessed, and when used in conditions they can be tested as absent
- return WrappedResolvedExpression.ofError(expression, new ResolutionFailureTreatedAsAbsent.ResolutionFailureTreatedAsAbsentDefaultException(e2));
- } else {
- throw Exceptions.propagate(e2);
+ } catch (Exception e) {
+ Exception e2 = e;
+ if (wrappingMode.deferAndRetryErroneousExpressions) {
+ return WrappedUnresolvedExpression.ofExpression(expression, this, allowBrooklynDsl);
+ }
+ if (!allowWaiting && Exceptions.isCausedByInterruptInAnyThread(e)) {
+ e2 = new IllegalArgumentException("Expression value '" + expression + "' unavailable and not permitted to wait: " + Exceptions.collapseText(e), e);
+ }
+ if (wrappingMode.deferThrowingError) {
+ // in wrapped value mode, errors don't throw until accessed, and when used in conditions they can be tested as absent
+ return WrappedResolvedExpression.ofError(expression, new ResolutionFailureTreatedAsAbsent.ResolutionFailureTreatedAsAbsentDefaultException(e2));
+ } else {
+ throw Exceptions.propagate(e2);
+ }
}
- } finally {
- if (entry != null) RESOLVE_STACK.pop(entry);
- }
+ });
}
private Object resolveDsl(Object expression) {
@@ -631,7 +668,7 @@ public class WorkflowExpressionResolution {
return errorMode;
}
- public Object processTemplateExpressionString(String expression, AllowBrooklynDslMode allowBrooklynDsl) {
+ protected Object processTemplateExpressionString(String expression, AllowBrooklynDslMode allowBrooklynDsl) {
Object result;
boolean ourWait = false;
try {
@@ -698,7 +735,7 @@ public class WorkflowExpressionResolution {
interruptSetIfNeededToPreventWaiting.remove();
}
- public Object processTemplateExpressionMap(Map<?,?> object, AllowBrooklynDslMode allowBrooklynDsl) {
+ protected Object processTemplateExpressionMap(Map<?,?> object, AllowBrooklynDslMode allowBrooklynDsl) {
if (allowBrooklynDsl.isAllowedHere() && object.size()==1) {
Object key = object.keySet().iterator().next();
if (key instanceof String && ((String)key).startsWith("$brooklyn:")) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
index 8a51d7c1b4..af00cbce16 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
@@ -86,7 +86,7 @@ public class TemplateProcessor {
public static TemplateModel wrapAsTemplateModel(Object o) throws TemplateModelException { return BROOKLYN_WRAPPER.wrap(o); }
public static Maybe<Object> unwrapTemplateModelMaybe(TemplateModel templateModel) { return BROOKLYN_WRAPPER.unwrapMaybe(templateModel); }
- static ThreadLocalStack<Map<TemplateModel,Object>> TEMPLATE_MODEL_UNWRAP_CACHE = new ThreadLocalStack<>(true);
+ static ThreadLocalStack<Map<TemplateModel,Object>> TEMPLATE_MODEL_UNWRAP_CACHE = new ThreadLocalStack<>();
/** A cache is used to be able to retrieve the object from which a TemplateModel was created, if needed,
* because Freemarker doesn't support that except on selected UnwrappableTemplateModel subclasses.
* Use wrap and unwrap methods above to access.
@@ -94,8 +94,8 @@ public class TemplateProcessor {
public static void openLocalTemplateModelCache() { TEMPLATE_MODEL_UNWRAP_CACHE.push(MutableMap.of()); }
public static void closeLocalTemplateModelCache() { TEMPLATE_MODEL_UNWRAP_CACHE.pop(); }
- static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new ThreadLocalStack<>(true);
- static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new ThreadLocalStack<>(true);
+ static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new ThreadLocalStack<>();
+ static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new ThreadLocalStack<>();
public interface UnwrappableTemplateModel {
Maybe<Object> unwrap();
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
index d225e1770f..f9a3f90479 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
@@ -22,23 +22,27 @@ import com.google.common.collect.Iterables;
import org.apache.brooklyn.util.guava.Maybe;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
+import java.util.stream.Stream;
public class ThreadLocalStack<T> implements Iterable<T> {
- private final boolean allowDuplicates;
+ private final boolean acceptDuplicates;
- public ThreadLocalStack(boolean allowsDuplicates) {
- this.allowDuplicates = allowsDuplicates;
+ /** if duplicates not accepted, the call to push will return false */
+ public ThreadLocalStack(boolean acceptDuplicates) {
+ this.acceptDuplicates = acceptDuplicates;
}
+ public ThreadLocalStack() { this.acceptDuplicates = true; }
final ThreadLocal<Collection<T>> set = new ThreadLocal<>();
public Collection<T> getAll(boolean forceInitialized) {
Collection<T> result = set.get();
if (forceInitialized && result==null) {
- result = allowDuplicates ? MutableList.of() : MutableSet.of();
+ result = acceptDuplicates ? MutableList.of() : MutableSet.of();
set.set(result);
}
return result;
@@ -52,13 +56,22 @@ public class ThreadLocalStack<T> implements Iterable<T> {
return last;
}
+ /** returns true unless duplicates are not accepted, in which case it returns false iff the object supplied is equal to one already present */
public boolean push(T object) {
return getAll(true).add(object);
}
+ /** top of stack first */
@Override
public Iterator<T> iterator() {
- return null;
+ return stream().iterator();
+ }
+
+ /** top of stack first */
+ public Stream<T> stream() {
+ MutableList<T> l = MutableList.copyOf(getAll(false));
+ Collections.reverse(l);
+ return l.stream();
}
public Maybe<T> peek() {