You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/07/19 17:01:58 UTC

[brooklyn-server] branch master updated: rejig container task

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ea3d2d981 rejig container task
9ea3d2d981 is described below

commit 9ea3d2d981dcc03e173fd4f61fbc20d5a65e5b1b
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Jul 19 17:59:51 2022 +0100

    rejig container task
    
    * to track output, gather exit code, and clearer semantics
    * also fix dirty detection across all processes
---
 .../core/effector/ssh/SshEffectorTasks.java        |   1 +
 .../system/internal/SystemProcessTaskFactory.java  |   1 +
 .../brooklyn/tasks/kubectl/ContainerCommons.java   |  25 +-
 .../brooklyn/tasks/kubectl/ContainerEffector.java  |  10 +-
 .../brooklyn/tasks/kubectl/ContainerSensor.java    |  14 +-
 .../tasks/kubectl/ContainerTaskFactory.java        | 373 ++++++++++++++++-----
 .../{JobBuilder.java => KubeJobFileCreator.java}   |  49 +--
 .../tasks/kubectl/ContainerEffectorTest.java       |   5 +-
 .../tasks/kubectl/ContainerSensorTest.java         |   6 +-
 .../brooklyn/tasks/kubectl/ContainerTaskTest.java  | 224 ++++++++-----
 ...uilderTest.java => KubeJobSpecCreatorTest.java} |  58 ++--
 11 files changed, 527 insertions(+), 239 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
index c29563681d..d101b5c4a0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
@@ -122,6 +122,7 @@ public class SshEffectorTasks {
         }
         @Override
         public synchronized ProcessTaskWrapper<RET> newTask() {
+            dirty = false;
             Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
             if (machine==null) {
                 if (log.isDebugEnabled())
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
index afe5d3346b..56be6cfc6f 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
@@ -68,6 +68,7 @@ public class SystemProcessTaskFactory<T extends SystemProcessTaskFactory<T,RET>,
 
     @Override
     public ProcessTaskWrapper<RET> newTask() {
+        dirty = false;
         return new SystemProcessTaskWrapper();
     }
 
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index 4601a9ce02..5dbdf41af5 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.tasks.kubectl;
 import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.SetConfigKey;
 import org.apache.brooklyn.util.time.Duration;
@@ -36,14 +35,16 @@ public interface ContainerCommons {
     ConfigKey<PullPolicy> CONTAINER_IMAGE_PULL_POLICY = ConfigKeys.newConfigKey(new TypeToken<PullPolicy>() {} ,
             "imagePullPolicy", "Container image pull policy. Allowed values: {IfNotPresent, Always, Never}. ", PullPolicy.ALWAYS);
 
-    ConfigKey<String> CONTAINER_NAME = ConfigKeys.newStringConfigKey("containerName", "Container name");
     ConfigKey<Boolean> KEEP_CONTAINER_FOR_DEBUGGING = ConfigKeys.newBooleanConfigKey("keepContainerForDebugging", "When set to true, the namespace" +
             " and associated resources and services are not destroyed after execution. Defaults value is 'false'.", Boolean.FALSE);
 
-    ConfigKey<List> COMMANDS = ConfigKeys.newConfigKey(List.class,"commands", "Commands to execute for container", Lists.newArrayList());
-    ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Arguments to execute for container", Lists.newArrayList());
+    ConfigKey<Object> BASH_SCRIPT = ConfigKeys.newConfigKey(Object.class,"bashScript", "A bash script (as string or list of strings) to run, implies command '/bin/bash' '-c' and replaces arguments");
+    ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command", "Single command and optional arguments to execute for the container (overrides image EntryPoint and Cmd)", Lists.newArrayList());
+    ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Additional arguments to pass to the command at the container (in addition to the command supplied here or the default in the image)", Lists.newArrayList());
 
-    ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container wait timeout", Duration.minutes(1));
+    ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container execution timeout (default 5 minutes)", Duration.minutes(5));
+
+    ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO = ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task should fail if container returns non-zero exit code (default true)", true);
 
     ConfigKey<String> WORKING_DIR = ConfigKeys.newStringConfigKey("workingDir", "Location where the container commands are executed");
     ConfigKey<Set<Map<String,String>>> VOLUME_MOUNTS =  new SetConfigKey.Builder<>(new TypeToken<Map<String,String>>()  {}, "volumeMounts")
@@ -52,11 +53,13 @@ public interface ContainerCommons {
     ConfigKey<Set<Map<String,Object>>> VOLUMES = new SetConfigKey.Builder(new TypeToken<Map<String,Object>>()  {}, "volumes")
             .description("List of directories with data that is accessible across multiple containers").defaultValue(null).build();
 
-    String NAMESPACE_CREATE_CMD = "kubectl create namespace brooklyn-%s"; // namespace name
-    String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=brooklyn-%s"; // namespace name
-    String JOBS_CREATE_CMD = "kubectl apply -f %s"; // deployment.yaml absolute path
-    String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s"; // timeout, containerName
-    String JOBS_LOGS_CMD = "kubectl logs jobs/%s"; // containerName
-    String NAMESPACE_DELETE_CMD = "kubectl delete namespace brooklyn-%s"; // namespace name
+    String NAMESPACE_CREATE_CMD = "kubectl create namespace %s";
+    String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=%s";
+    String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s";
+    String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
+    String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
+    String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s";
+    String PODS_EXIT_CODE_CMD = "kubectl get pods --namespace=%s -ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'";
+    String NAMESPACE_DELETE_CMD = "kubectl delete namespace %s";
 
 }
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
index 5b34ae604e..0c41de1a24 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
@@ -26,12 +26,9 @@ import org.apache.brooklyn.core.effector.Effectors;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.EFFECTOR_TAG;
 
@@ -69,14 +66,13 @@ public class ContainerEffector extends AddEffectorInitializerAbstract implements
         @Override
         public String call(ConfigBag parameters) {
             ConfigBag configBag = ConfigBag.newInstanceCopying(this.params).putAll(parameters);
-            Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+            Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
                     .summary("Executing Container Image: " + EntityInitializers.resolve(configBag, CONTAINER_IMAGE))
-                    .tag(entity().getId() + "-" + EFFECTOR_TAG)
+                    .jobIdentifier(entity().getId() + "-" + EFFECTOR_TAG)
                     .configure(configBag.getAllConfig())
                     .newTask();
             DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity());
-            Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
-            return result.toString();
+            return containerTask.getUnchecked().getMainStdout();
         }
     }
 }
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
index 0d54e40680..7bb8dccc4e 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.brooklyn.tasks.kubectl;
 
-import com.google.common.collect.Iterables;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -35,7 +34,6 @@ import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -77,18 +75,14 @@ public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements Cont
                         .callable(new Callable<Object>() {
                             @Override
                             public Object call() throws Exception {
-                                Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+                                Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
                                         .summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME))
-                                        .tag(entity.getId() + "-" + SENSOR_TAG)
+                                        .jobIdentifier(entity.getId() + "-" + SENSOR_TAG)
                                         .configure(configBag.getAllConfig())
                                         .newTask();
                                 DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-                                Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
-                                List<String> res = (List<String>) result;
-                                while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
-
-                                String res2 = res.isEmpty() ? null : Iterables.getLast(res);
-                                return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(res2);
+                                String mainStdout = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout();
+                                return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout);
                             }
                         })
                         .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index 7e9a5910df..a03f6f2b1a 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -19,103 +19,313 @@
 package org.apache.brooklyn.tasks.kubectl;
 
 import com.google.common.collect.Lists;
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskFactory;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.TaskBuilder;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskStub;
 import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
 import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory;
+import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.CountdownTimer;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.brooklyn.tasks.kubectl.ContainerCommons.*;
 
-public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET>  implements TaskFactory<Task<RET>> {
+public class ContainerTaskFactory<T extends ContainerTaskFactory<T>> implements TaskFactory<Task<ContainerTaskFactory.ContainerTaskResult>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ContainerTaskFactory.class);
 
     protected String summary;
-    protected String tag = "";
+    protected String jobIdentifier = "";
     protected final ConfigBag config = ConfigBag.newInstance();
+    private String namespace;
+    private Boolean createNamespace;
+    private Boolean deleteNamespace;
 
     @Override
-    public Task<RET> newTask() {
-        ConfigBag configBag = this.config;
-
-        List<String> commandsCfg =  EntityInitializers.resolve(configBag, COMMANDS);
-        List<String> argumentsCfg =  EntityInitializers.resolve(configBag, ARGUMENTS);
-        String containerImage = EntityInitializers.resolve(configBag, CONTAINER_IMAGE);
-        PullPolicy containerImagePullPolicy = EntityInitializers.resolve(configBag, CONTAINER_IMAGE_PULL_POLICY);
-        String containerNameFromCfg = EntityInitializers.resolve(configBag, CONTAINER_NAME);
-        Boolean devMode = EntityInitializers.resolve(configBag, KEEP_CONTAINER_FOR_DEBUGGING);
-
-        String workingDir = EntityInitializers.resolve(configBag, WORKING_DIR);
-        Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(configBag, VOLUME_MOUNTS);
-        Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(configBag, VOLUMES);
-
-        if(Strings.isBlank(containerImage)) {
-            throw new IllegalStateException("You must specify containerImage when using " + this.getClass().getSimpleName());
-        }
+    public Task<ContainerTaskResult> newTask() {
+        final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+        TaskBuilder<ContainerTaskResult> taskBuilder = Tasks.<ContainerTaskResult>builder().dynamic(true)
+                .displayName(this.summary)
+                .tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout))
+                .body(()-> {
+                    List<String> commandsCfg =  EntityInitializers.resolve(config, COMMAND);
+                    List<String> argumentsCfg =  EntityInitializers.resolve(config, ARGUMENTS);
 
-        final String cleanImageName = containerImage.contains(":") ?  containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
-
-        final String containerName = (Strings.isBlank(containerNameFromCfg)
-                ? ( (Strings.isNonBlank(this.tag) ? this.tag + "-" : "").concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10)))
-                : containerNameFromCfg)
-                .replaceAll("[^A-Za-z0-9-]", "") // remove all symbols
-                .toLowerCase();
-
-        final String jobYamlLocation =  new JobBuilder()
-                .withImage(containerImage)
-                .withImagePullPolicy(containerImagePullPolicy)
-                .withName(containerName)
-                .withArgs(argumentsCfg)
-                .withEnv(EntityInitializers.resolve(configBag, BrooklynConfigKeys.SHELL_ENVIRONMENT))
-                .withCommands(Lists.newArrayList(commandsCfg))
-                .withVolumeMounts(volumeMounts)
-                .withVolumes(volumes)
-                .withWorkingDir(workingDir)
-                .build();
-
-
-        final long timeoutInSeconds = EntityInitializers.resolve(configBag, TIMEOUT).toSeconds();
-        Task<String> runCommandsTask = buildKubeTask(configBag, "Submit job", String.format(JOBS_CREATE_CMD,jobYamlLocation)).asTask();
-        Task<String> waitTask =  buildKubeTask(configBag, "Wait For Completion", String.format(JOBS_FEED_CMD,timeoutInSeconds,containerName)).asTask();
-        if(!devMode) {
-            // making these two inessential to insure proper namespace cleanup
-            BrooklynTaskTags.addTagDynamically(runCommandsTask, BrooklynTaskTags.INESSENTIAL_TASK);
-            BrooklynTaskTags.addTagDynamically(waitTask, BrooklynTaskTags.INESSENTIAL_TASK);
-        }
+                    Object bashScript = EntityInitializers.resolve(config, BASH_SCRIPT);
+                    if (bashScript!=null) {
+                        if (!commandsCfg.isEmpty()) LOG.warn("Ignoring 'command' "+commandsCfg+" because bashScript is set");
+                        if (!argumentsCfg.isEmpty()) LOG.warn("Ignoring 'args' "+argumentsCfg+" because bashScript is set");
+
+                        commandsCfg = MutableList.of("/bin/bash", "-c");
+                        List<Object> argumentsCfgO = bashScript instanceof Iterable ? MutableList.copyOf((Iterable) commandsCfg) : MutableList.of(bashScript);
+                        argumentsCfg = MutableList.of(argumentsCfgO.stream().map(x -> ""+x).collect(Collectors.joining("\n")));
+                    }
+
+                    String containerImage = EntityInitializers.resolve(config, CONTAINER_IMAGE);
+                    PullPolicy containerImagePullPolicy = EntityInitializers.resolve(config, CONTAINER_IMAGE_PULL_POLICY);
+                    Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
+
+                    String workingDir = EntityInitializers.resolve(config, WORKING_DIR);
+                    Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(config, VOLUME_MOUNTS);
+                    Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(config, VOLUMES);
+
+                    if(Strings.isBlank(containerImage)) {
+                        throw new IllegalStateException("You must specify containerImage when using " + this.getClass().getSimpleName());
+                    }
+
+                    Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
+                    if (entity == null) {
+                        throw new IllegalStateException("Task must run in context of entity to background jobs");
+                    }
+
+                    final String cleanImageName = containerImage.contains(":") ? containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
+
+                    final String containerName = (Strings.isNonBlank(this.jobIdentifier) ? this.jobIdentifier + "-" : "")
+                            .concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10))
+                            .replaceAll("[^A-Za-z0-9-]", "") // remove all symbols
+                            .toLowerCase();
+                    if (namespace==null) {
+                        namespace = "brooklyn-" + containerName;
+                    }
+
+                    LOG.debug("Submitting container job in namespace "+namespace);
+
+                    Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, BrooklynConfigKeys.SHELL_ENVIRONMENT));
+                    final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml =  new KubeJobFileCreator()
+                            .withImage(containerImage)
+                            .withImagePullPolicy(containerImagePullPolicy)
+                            .withName(containerName)
+                            .withArgs(argumentsCfg)
+                            .withEnv(env)
+                            .withCommand(Lists.newArrayList(commandsCfg))
+                            .withVolumeMounts(volumeMounts)
+                            .withVolumes(volumes)
+                            .withWorkingDir(workingDir)
+                            .createFile();
+                    Tasks.addTagDynamically(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, env));
+
+                    try {
+
+                        Duration timeout = EntityInitializers.resolve(config, TIMEOUT);
+
+                        ContainerTaskResult result = new ContainerTaskResult();
+                        result.interestingJobs = MutableList.of();
+
+                        ProcessTaskWrapper<ProcessTaskWrapper<?>> createNsJob = null;
+                        if (!Boolean.FALSE.equals(createNamespace)) {
+                            ProcessTaskFactory<ProcessTaskWrapper<?>> createNsJobF = newSimpleTaskFactory(
+                                    String.format(NAMESPACE_CREATE_CMD, namespace)
+                                    //, String.format(NAMESPACE_SET_CMD, namespace)
+                            ).summary("Set up namespace").returning(x -> x);
+                            if (createNamespace==null) {
+                                createNsJobF.allowingNonZeroExitCode();
+                            }
+                            createNsJob = DynamicTasks.queue(createNsJobF.newTask());
+                        }
+
+                        // only delete if told to always, unless we successfully create it
+                        boolean deleteNamespaceHere = Boolean.TRUE.equals(deleteNamespace);
+                        try {
+                            if (createNsJob!=null) {
+                                ProcessTaskWrapper<?> nsDetails = createNsJob.get();
+                                if (nsDetails.getExitCode()==0) {
+                                    LOG.debug("Namespace created");
+                                    if (deleteNamespace==null) deleteNamespaceHere = true;
+                                } else if (nsDetails.getExitCode()==1 && nsDetails.getStderr().contains("AlreadyExists")) {
+                                    if (Boolean.TRUE.equals(createNamespace)) {
+                                        LOG.warn("Namespace "+namespace+" already exists; failing");
+                                        throw new IllegalStateException("Namespace "+namespace+" exists when creating a job that expects to create this namespace");
+                                    } else {
+                                        LOG.debug("Namespace exists already; reusing it");
+                                    }
+                                } else {
+                                    LOG.warn("Unexpected namespace creation problem: "+nsDetails.getStderr()+ "(code "+nsDetails.getExitCode()+")");
+                                    if (deleteNamespace==null) deleteNamespaceHere = true;
+                                    throw new IllegalStateException("Unexpected namespace creation problem ("+namespace+"); see log for more details");
+                                }
+                            }
+
+                            result.interestingJobs.add(DynamicTasks.queue(
+                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask()));
+
+                            final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
+
+                            // use `wait --for` api, but in a 5s loop in case there are other issues
+                            boolean succeeded = DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+                                while (true) {
+                                    LOG.debug("Container job submitted, now waiting on success or failure");
+
+                                    long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
+                                    final AtomicInteger finishCount = new AtomicInteger(0);
+
+                                    ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD, secondsLeft, containerName, namespace))
+                                            .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+                                    Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+                                        try {
+                                            if (waitForSuccess.get().contains("condition met")) LOG.debug("Container job "+namespace+" detected as completed (succeeded) in kubernetes");
+                                        } finally {
+                                            synchronized (finishCount) {
+                                                finishCount.incrementAndGet();
+                                                finishCount.notifyAll();
+                                            }
+                                        }
+                                    }));
+
+                                    ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft, containerName, namespace))
+                                            .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+                                    Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+                                        try {
+                                            if (waitForFailed.get().contains("condition met")) LOG.debug("Container job "+namespace+" detected as failed in kubernetes (may be valid non-zero exit)");
+                                        } finally {
+                                            synchronized (finishCount) {
+                                                finishCount.incrementAndGet();
+                                                finishCount.notifyAll();
+                                            }
+                                        }
+                                    }));
+
+                                    while (finishCount.get() == 0) {
+                                        LOG.debug("Container job "+namespace+" waiting on complete or failed");
+                                        try {
+                                            synchronized (finishCount) {
+                                                finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+                                            }
+                                        } catch (InterruptedException e) {
+                                            throw Exceptions.propagate(e);
+                                        }
+                                    }
+
+                                    if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+                                    if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+                                    LOG.debug("Container job "+namespace+" not yet complete, will retry");
+                                    // probably timed out or job not yet available; short wait then retry
+                                    Time.sleep(Duration.millis(50));
+                                    if (timer.isExpired())
+                                        throw new IllegalStateException("Timeout waiting for success or failure");
+
+                                    // any other one-off checks for job error, we could do here
+
+                                    // finally get the partial log for reporting
+                                    ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, containerName, namespace)).summary("Retrieve output so far").newTask());
+                                    BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
+                                    String outputSoFar = outputSoFarCmd.get();
+                                    String newOutput = outputSoFar.substring(stdout.size());
+                                    LOG.debug("Container job "+namespace+" output: "+newOutput);
+                                    stdout.write(newOutput.getBytes(StandardCharsets.UTF_8));
+                                }
+
+                            }).build()).getUnchecked();
+                            LOG.debug("Container job "+namespace+" completed, success "+succeeded);
+
+                            ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, containerName, namespace)).summary("Retrieve output").newTask());
+                            result.interestingJobs.add(retrieveOutput);
+
+                            ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace)).summary("Retrieve exit code").newTask());
+                            result.interestingJobs.add(retrieveExitCode);
+
+                            DynamicTasks.waitForLast();
+                            result.mainStdout = retrieveOutput.get();
+                            stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8));
+
+                            String exitCodeS = retrieveExitCode.getStdout();
+                            if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
+                            else result.mainExitCode = -1;
+
+                            if (result.mainExitCode!=0 && config.get(REQUIRE_EXIT_CODE_ZERO)) {
+                                LOG.info("Failed container job "+namespace+" (exit code "+result.mainExitCode+") output: "+result.mainExitCode);
+                                throw new IllegalStateException("Non-zero exit code (" + result.mainExitCode + ") disallowed");
+                            }
+
+                            return result;
+
+                        } finally {
+                            // clean up - delete namespace
+                            if (!devMode && deleteNamespaceHere) {
+                                LOG.debug("Deleting namespace "+namespace);
+                                // do this not as a subtask so we can run even if the main queue fails
+                                Entities.submit(entity, newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").newTask()).block();
+                            }
+                            System.runFinalization();
+                        }
+                    } catch (Exception e) {
+                        throw Exceptions.propagate(e);
+                    } finally {
+                        jobYaml.deleteIfTemp();
+                    }
+                });
 
-        TaskBuilder<RET> taskBuilder = Tasks.<RET>builder()
-                .displayName(this.summary)
-                .add(buildKubeTask(configBag, "Set Up and Run",
-                        String.format(NAMESPACE_CREATE_CMD,containerName),
-                        String.format(NAMESPACE_SET_CMD,containerName)))
-                .add(runCommandsTask)
-                .add(waitTask)
-                .add(buildKubeTask(configBag, "Retrieve Output", String.format(JOBS_LOGS_CMD,containerName)));
-
-        if(!devMode) {
-            taskBuilder.add(buildKubeTask(configBag, "Tear Down", String.format(NAMESPACE_DELETE_CMD,containerName)));
-        }
         return taskBuilder.build();
     }
 
-    public T summary(String summary) {
-        this.summary = summary;
+    public T summary(String summary) { this.summary = summary; return self(); }
+    public T timeout(Duration timeout) { config.put(TIMEOUT, timeout); return self(); }
+    public T command(List<String> commands) { config.put(COMMAND, commands); return self(); }
+    public T command(String baseCommandWithNoArgs, String ...extraCommandArguments) { config.put(COMMAND, MutableList.of(baseCommandWithNoArgs).appendAll(Arrays.asList(extraCommandArguments))); return self(); }
+    public T bashScriptCommands(List<String> commands) {
+        config.put(BASH_SCRIPT, commands);
+        return self();
+    }
+    public T bashScriptCommands(String firstCommandAndArgs, String ...otherCommandAndArgs) { return bashScriptCommands(MutableList.of(firstCommandAndArgs).appendAll(Arrays.asList(otherCommandAndArgs))); }
+    public T image(String image) { config.put(CONTAINER_IMAGE, image); return self(); }
+    public T allowNonZeroExitCode() { return allowNonZeroExitCode(true); }
+    public T allowNonZeroExitCode(boolean allowNonZero) { config.put(REQUIRE_EXIT_CODE_ZERO, !allowNonZero); return self(); }
+    public T imagePullPolicy(PullPolicy policy) { config.put(CONTAINER_IMAGE_PULL_POLICY, policy); return self(); }
+    public T env(Map<String,?> map) {
+        config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf( map ) );
+        return self();
+    }
+    public T env(String key, Object val) {
+        return env(MutableMap.copyOf( config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val));
+    }
+
+    /** specify the namespace to use, and whether to create or delete it. by default a randomly generated namespace is used and always cleaned up,
+     * but by using this, a caller can ensure the namespace persists. if using this, it is the caller's responsibility to avoid collisions.
+     *
+     * @param namespace namespace to use
+     * @param create whether to create; null (default) is to auto-detect, create it if it doesn't exist
+     * @param delete wehther to delete; null (default) means to delete it if and only if we created it (and not dev mode)
+     */
+    public T useNamespace(String namespace, Boolean create, Boolean delete) {
+        this.namespace = namespace;
+        this.createNamespace = create;
+        this.deleteNamespace = delete;
         return self();
     }
 
-    public T tag(String tag) {
-        this.tag = tag;
+    public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete; return self(); }
+
+    /** visible in the container environment */
+    public T jobIdentifier(String jobIdentifier) {
+        this.jobIdentifier = jobIdentifier;
         return self();
     }
 
@@ -127,23 +337,36 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET>  im
         return self();
     }
 
-    public static ProcessTaskWrapper<String> buildKubeTask(final ConfigBag configBag, final String taskSummary, final String... kubeCommands) {
-        Map<String, Object> env = EntityInitializers.resolve(configBag, BrooklynConfigKeys.SHELL_ENVIRONMENT);
-        Map<String, String> envVars = MutableMap.of();
-        if(env != null && env.size() > 0) {
-            env.forEach((k,v) -> envVars.put(k, v.toString()));
-        }
+    private ProcessTaskFactory<String> newSimpleTaskFactory(final String... kubeCommands) {
         return new SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory<String>(kubeCommands)
-                .summary(taskSummary)
-                .configure(configBag.getAllConfig())
-                .environmentVariables(envVars) // needed to be shown in the UI ;)
+                //i think we don't care about any of these configs, and most cause debug messages about them being ignored
+                //.configure(config.getAllConfig())
+                //.environmentVariables(envVars) // needed to be shown in the UI ;)
+
                 .<String>returning(ProcessTaskStub.ScriptReturnType.STDOUT_STRING)
-                .requiringExitCodeZero().newTask();
+                .requiringExitCodeZero();
     }
 
-    public static class ConcreteContainerTaskFactory<RET> extends ContainerTaskFactory<ConcreteContainerTaskFactory<RET>, RET> {
+    public static class ConcreteContainerTaskFactory extends ContainerTaskFactory<ConcreteContainerTaskFactory> {
         public ConcreteContainerTaskFactory() {
             super();
         }
     }
+
+    public static class ContainerTaskResult {
+        private List<ProcessTaskWrapper<?>> interestingJobs;
+        private String mainStdout;
+        private Integer mainExitCode;
+
+        /** This will be 0 unless allowNonZeroExitCode was specified */
+        public Integer getMainExitCode() {
+            return mainExitCode;
+        }
+        public String getMainStdout() {
+            return mainStdout;
+        }
+        public List<ProcessTaskWrapper<?>> getInterestingJobs() {
+            return interestingJobs;
+        }
+    }
 }
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
similarity index 88%
rename from software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
rename to software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
index 9249393f44..5b090d7442 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.tasks.kubectl;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
 import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,8 +40,8 @@ import java.util.stream.Collectors;
 /**
  * This was needed to ensure our Kubernetes Yaml Job configurations are valid.
  */
-public class JobBuilder {
-    private static final Logger LOG = LoggerFactory.getLogger(JobBuilder.class);
+public class KubeJobFileCreator {
+    private static final Logger LOG = LoggerFactory.getLogger(KubeJobFileCreator.class);
 
     String jobName;
     String imageName;
@@ -51,21 +52,21 @@ public class JobBuilder {
 
     String prefix = "brooklyn-job";
 
-    List<String> commands = Lists.newArrayList();
+    List<String> command = Lists.newArrayList();
     List<String> args = Lists.newArrayList();
 
-    Map<String, Object> env = Maps.newHashMap();
+    Map<String, String> env = Maps.newHashMap();
 
     List<Map<String,String>> volumeMounts = Lists.newArrayList();
 
     List<Map<String, Object>> volumes = Lists.newArrayList();
 
-    public JobBuilder withName(final String name) {
+    public KubeJobFileCreator withName(final String name) {
         this.jobName = name;
         return this;
     }
 
-    public JobBuilder withImage(final String image){
+    public KubeJobFileCreator withImage(final String image){
         this.imageName = image;
         return this;
     }
@@ -75,59 +76,59 @@ public class JobBuilder {
      * @param eimagePullPolicy
      * @return
      */
-    public JobBuilder withImagePullPolicy(final PullPolicy eimagePullPolicy){
+    public KubeJobFileCreator withImagePullPolicy(final PullPolicy eimagePullPolicy){
         if (eimagePullPolicy != null) {
             this.imagePullPolicy = eimagePullPolicy.val();
         }
         return this;
     }
 
-    public JobBuilder withCommands(final List<String> commandsArg){
-        if (commandsArg != null) {
-            this.commands.addAll(commandsArg);
+    public KubeJobFileCreator withCommand(final List<String> commandAndEntryPointArgs){
+        if (commandAndEntryPointArgs != null) {
+            this.command.addAll(commandAndEntryPointArgs);
         }
         return this;
     }
 
-    public JobBuilder withArgs(final List<String> args){
+    public KubeJobFileCreator withArgs(final List<String> args){
         if (args != null) {
             this.args.addAll(args);
         }
         return this;
     }
 
-    public JobBuilder withVolumeMounts(final Set<Map<String,String>> volumeMounts) {
+    public KubeJobFileCreator withVolumeMounts(final Set<Map<String,String>> volumeMounts) {
         if (volumeMounts != null) {
             this.volumeMounts.addAll(volumeMounts);
         }
         return this;
     }
 
-    public JobBuilder withVolumes(final Set<Map<String, Object>> volumes) {
+    public KubeJobFileCreator withVolumes(final Set<Map<String, Object>> volumes) {
         if (volumes != null) {
             this.volumes.addAll(volumes);
         }
         return this;
     }
 
-    public JobBuilder withWorkingDir(String workingDir) {
+    public KubeJobFileCreator withWorkingDir(String workingDir) {
         this.workingDir = workingDir;
         return this;
     }
 
-    public JobBuilder withPrefix(final String prefixArg){
+    public KubeJobFileCreator withPrefix(final String prefixArg){
         this.prefix = prefixArg;
         return this;
     }
 
-    public JobBuilder withEnv(final Map<String,Object> env){
+    public KubeJobFileCreator withEnv(final Map<String,String> env){
         if (env != null) {
             this.env.putAll(env);
         }
         return this;
     }
 
-    public String build(){
+    public BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> createFile(){
         JobTemplate jobTemplate = new JobTemplate(jobName);
 
         ContainerSpec containerSpec = jobTemplate.getSpec().getTemplate().getContainerSpec(0);
@@ -142,13 +143,13 @@ public class JobBuilder {
             List<Map<String,String>> envList = env.entrySet().stream().map (e ->  {
                     Map<String,String> envItem = new HashMap<>();
                     envItem.put("name", e.getKey());
-                    envItem.put("value", e.getValue().toString());
+                    envItem.put("value", e.getValue());
                     return envItem;
                 }).collect(Collectors.toList());
             containerSpec.setEnv(envList);
         }
-        if (!commands.isEmpty()) {
-            containerSpec.setCommand(this.commands);
+        if (!command.isEmpty()) {
+            containerSpec.setCommand(this.command);
         }
         if (!args.isEmpty()) {
             containerSpec.setArgs(this.args);
@@ -177,7 +178,7 @@ public class JobBuilder {
         return serializeAndWriteToTempFile(jobTemplate);
     }
 
-    private String serializeAndWriteToTempFile(JobTemplate jobTemplate) {
+    private BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> serializeAndWriteToTempFile(JobTemplate jobTemplate) {
         DumperOptions options = new DumperOptions();
         options.setIndent(2);
         options.setPrettyFlow(true);
@@ -203,8 +204,8 @@ public class JobBuilder {
             PrintWriter sw = new PrintWriter(jobBodyPath);
             Yaml yaml = new Yaml(representer, options);
             yaml.dump(jobTemplate, sw);
-            LOG.info("Job body dumped at: {}" , jobBodyPath.getAbsolutePath());
-            return jobBodyPath.getAbsolutePath();
+            LOG.debug("Job body dumped at: {}" , jobBodyPath.getAbsolutePath());
+            return new BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<>(jobBodyPath, true);
         } catch (IOException e) {
             throw new RuntimeException("Failed to create temp file for container", e);
         }
@@ -226,7 +227,7 @@ class TemplateSpec {
     /**
      * To do so, set .spec.backoffLimit to specify the number of retries before considering a Job as failed. The back-off limit is set by default to 6.
      */
-    Integer backoffLimit = 1;
+    Integer backoffLimit = 0;
 
     JobSpec template;
 
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
index cee55084e3..d882b2fd06 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
@@ -49,7 +49,8 @@ public class ContainerEffectorTest extends BrooklynAppUnitTestSupport {
                 ContainerEffector.EFFECTOR_NAME, "test-container-effector",
                 ContainerCommons.CONTAINER_IMAGE, "perl",
                 ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT,
-                ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash", "-c", "echo " + message),
+//                ContainerCommons.COMMAND, ImmutableList.of("/bin/bash", "-c", "echo " + message),
+                ContainerCommons.BASH_SCRIPT, "echo "+message+" ${HELLO}",
                 BrooklynConfigKeys.SHELL_ENVIRONMENT, ImmutableMap.<String, Object>of("HELLO", "WORLD")));
 
         ContainerEffector initializer = new ContainerEffector(parameters);
@@ -58,7 +59,7 @@ public class ContainerEffectorTest extends BrooklynAppUnitTestSupport {
 
         EntityAsserts.assertAttributeEqualsEventually(parentEntity, Attributes.SERVICE_UP, true);
         Object output = Entities.invokeEffector(app, parentEntity, parentEntity.getEffector("test-container-effector")).getUnchecked(Duration.ONE_MINUTE);
-        assertTrue(output.toString().contains(message));
+        assertTrue(output.toString().contains(message+" WORLD"));
     }
 
     @Test
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
index 64a2f5f1c7..5de54807d3 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
@@ -41,7 +41,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
         ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
                 ContainerCommons.CONTAINER_IMAGE, "perl",
                 ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT,
-                ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash", "-c","echo " + message) ,
+                ContainerCommons.COMMAND, ImmutableList.of("/bin/bash", "-c","echo " + message) ,
                 ContainerSensor.SENSOR_PERIOD, "1s",
                 ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
 
@@ -59,7 +59,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
 
         ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
                 ContainerCommons.CONTAINER_IMAGE, "perl",
-                ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash") ,
+                ContainerCommons.COMMAND, ImmutableList.of("/bin/bash") ,
                 ContainerCommons.ARGUMENTS, ImmutableList.of("-c", "echo " + message) ,
                 ContainerSensor.SENSOR_PERIOD, "1s",
                 ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
@@ -112,7 +112,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
     public void testTfVersionSensor() {
         ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
                 ContainerCommons.CONTAINER_IMAGE, "hashicorp/terraform:1.3.0-alpha20220622",
-                ContainerCommons.COMMANDS, ImmutableList.of("terraform", "version" ),
+                ContainerCommons.COMMAND, ImmutableList.of("terraform", "version" ),
                 ContainerSensor.SENSOR_PERIOD, "1s",
                 ContainerSensor.SENSOR_NAME, "tf-version-sensor"));
 
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
index 36fe0acc08..4dda230118 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
@@ -19,27 +19,21 @@
 package org.apache.brooklyn.tasks.kubectl;
 
 import com.beust.jcommander.internal.Maps;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.HasTaskChildren;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.time.Duration;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
-import java.util.List;
-
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -53,107 +47,181 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
     public void testSuccessfulContainerTask() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
 
-        Map<String,Object> configBag = new HashMap<>();
-        configBag.put("name", "test-container-task");
-        configBag.put("image", "perl");
-        configBag.put("commands", Lists.newArrayList("/bin/bash", "-c","echo 'hello test'"));
-        configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
-        Task<String> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory()
                 .summary("Running container task")
-                .configure(configBag)
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("perl")
+                .command( "/bin/bash", "-c","echo 'hello test'" )
                 .newTask();
+
         DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-        Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
-        List<String> res = (List<String>) result;
-        while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
+        ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+        Asserts.assertEquals(result.getMainStdout().trim(), "hello test");
+    }
+
+    @Test
+    public void testContainerTaskWithVar() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory()
+                .summary("Running container task")
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("perl")
+                .env("test_name", "EnvTest")
+                .bashScriptCommands("echo hello ${test_name}" )
+                .newTask();
 
-        String res2 = res.isEmpty() ? null : Iterables.getLast(res);
-        assertTrue(res2.startsWith("hello test"));
+        DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+        ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+        Asserts.assertEquals(result.getMainStdout().trim(), "hello EnvTest");
+        Asserts.assertEquals(BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_ENV).streamContents.get().trim(), "test_name=\"EnvTest\"");
     }
 
     @Test
     public void testSuccessfulContainerTerraformTask() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
 
-        Map<String,Object> configBag = new HashMap<>();
-        configBag.put("name", "test-container-task");
-        configBag.put("image", "hashicorp/terraform:latest");
-        configBag.put("commands",  ImmutableList.of("terraform", "version" ));
-        configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
-        Task<String> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
                 .summary("Running terraform-container task")
-                .configure(configBag)
+                .jobIdentifier("test-container-task")
+                .timeout(Duration.TWO_MINUTES)
+                .image("hashicorp/terraform:latest")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .command( "terraform", "version" )
                 .newTask();
         DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-        Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
-        List<String> res = (List<String>) result;
-        while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
 
-        String res2 = res.isEmpty() ? null : Iterables.getLast(res);
-        assertTrue(res2.startsWith("Terraform"));
+        ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked();
+        assertTrue(result.getMainStdout().startsWith("Terraform"));
     }
 
-    @Test// tries to execute local command, wants it to fail, but even so best as integration
-    public void testFailingContainerTask() {
+    @Test // execute local command, assert we get exit code, and it fails
+    public void testExpectedFailingContainerTask() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
 
-        List<String> commands = MutableList.of("/bin/bash", "-c","echo 'hello test' & exit 1");
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory()
+                .summary("Running container task")
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("perl")
+                .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+                .allowNonZeroExitCode()
+                .newTask();
+        DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+        Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE));  // should complete in 1 minute, i.e. we detect the failed
+        Asserts.assertTrue(containerTask.isDone());
+        Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+        Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(), "hello test");
+    }
 
-        Map<String,Object> configBag = new HashMap<>();
-        configBag.put("name", "test-container-task");
-        configBag.put("image", "perl");
-        configBag.put("commands", commands);
-        configBag.put("timeout", "1m");
-
-        Task<String> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
-                .summary("Running docker task")
-                .configure(configBag)
+
+    @Test // execute local command, assert we get exit code, and it fails
+    public void testSleepingAndExpectedFailingContainerTask() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory()
+                .summary("Running container task")
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("perl")
+                .bashScriptCommands("echo starting", "sleep 6", "echo done", "exit 42", "echo ignored")
+                .allowNonZeroExitCode()
                 .newTask();
-        try {
-            DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity).getTask().get();
-            if (containerTask instanceof HasTaskChildren) {
-                for (Task<?> child: ((HasTaskChildren)containerTask).getChildren()) {
-                    if(child.getTags().contains(BrooklynTaskTags.INESSENTIAL_TASK) && child.isError()) {
-                       child.get();
-                    }
-                }
-            }
-        } catch (Exception e) {
-            Asserts.expectedFailureContains(e, "Process task ended with exit code", "when 0 was required");
-        }
+        DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+        Asserts.eventually(() -> {
+                    BrooklynTaskTags.WrappedStream stream = BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_STDOUT);
+                    if (stream==null) return null;
+                    return stream.streamContents.get().trim();
+                }, x -> x.equals("starting"));
+        Asserts.assertFalse(containerTask.isDone());
+
+        // may be occasional timing glitches here, eg if namespace cleanup takes > 3s
+        Stopwatch sw = Stopwatch.createStarted();
+        // should complete in under 10s, i.e. we detect the failed in less than 5s after
+        Asserts.assertTrue(containerTask.blockUntilEnded(Duration.seconds(15)), "should definitely finish within 15s of starting");
+        Asserts.assertThat(Duration.of(sw.elapsed()), dur -> dur.isShorterThan(Duration.seconds(10)));
+
+        Asserts.assertTrue(containerTask.isDone());
+        Asserts.assertEquals(BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_STDOUT).streamContents.get().trim(), "starting\ndone");
+        Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+        Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(), "starting\ndone");
+    }
+
+    @Test // execute local command, assert fails if exit code not allowed to be zero
+    public void testNotExpectedFailingContainerTask() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+        Task<ContainerTaskFactory.ContainerTaskResult> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory()
+                .summary("Running container task")
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("perl")
+                .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+//                .allowNonZeroExitCode()
+                .newTask();
+        DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+        Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE));  // should complete in 1 minute, i.e. we detect the failed
+        Asserts.assertTrue(containerTask.isDone());
+        Asserts.assertTrue(containerTask.isError());
+        Asserts.assertFailsWith(() -> containerTask.getUnchecked(), error -> Asserts.expectedFailureContainsIgnoreCase(error, "Non-zero", "42"));
     }
 
     @Test
     public void testScriptContainerTask() {
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
         Map<String,Object> volumes = Maps.newHashMap();
-        volumes.put("name", "tf-ws");
-        volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
-
-        List<String> commands = MutableList.of("./hello.sh");
+        String volumeId = "brooklyn-container-task-test-volume";
+        String uid = Identifiers.makeRandomId(8).toLowerCase();
+        volumes.put("name", volumeId);
+        // sometimes this can be mounted from local drive, but does not always need to be
+        volumes.put("hostPath", Maps.newHashMap("path", "/tmp/brooklyn-container-task-test-shared"));
 
         Map<String,Object> configBag = new HashMap<>();
-        configBag.put("name", "test-container-task");
-        configBag.put("image", "hhwang927/ubuntu_base");
-        configBag.put("imagePullPolicy", "never");
-        configBag.put("commands", commands);
-        configBag.put("workingDir", "/tfws/scripts");
+        configBag.put("workingDir", "/brooklyn-mount-dir/scripts");
         configBag.put("volumes", Sets.newHashSet(volumes));
-        configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")));
+        configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name", volumeId, "mountPath", "/brooklyn-mount-dir")));
 
-        Task<String> containerTask =  new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
-                .summary("Running docker task")
-                .configure(configBag)
-                .newTask();
-        DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-        Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
-        List<String> res = (List<String>) result;
-        while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
+        ContainerTaskFactory.ConcreteContainerTaskFactory baseFactory = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+                .summary("Running container task")
+                .jobIdentifier("test-container-task")
+                .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+                .timeout(Duration.TWO_MINUTES)
+                .image("hhwang927/ubuntu_base")
+                .useNamespace("brooklyn-container-test-"+uid, null, false)
+                .configure(configBag);
 
-        String res2 = res.isEmpty() ? null : Iterables.getLast(res);
-        assertTrue(res2.contains("hello"));
+        try {
+            // first create the script
+            Task<ContainerTaskFactory.ContainerTaskResult> setup = baseFactory.bashScriptCommands(
+                    "mkdir -p /brooklyn-mount-dir/scripts",
+                    "cd /brooklyn-mount-dir/scripts",
+                    "echo echo hello " + uid + " > hello-"+uid+".sh",
+                    "chmod +x hello-"+uid+".sh"
+            ).newTask();
+            DynamicTasks.queueIfPossible(setup).orSubmitAsync(entity).getTask().getUnchecked();
+
+            // now make a new container that should see the same mount point, and try running the command
+            Task<ContainerTaskFactory.ContainerTaskResult> containerTask = baseFactory.bashScriptCommands(
+                            "./hello-"+uid+".sh"
+                    )
+                    .newTask();
+            DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+            ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+            Asserts.assertEquals(result.getMainStdout().trim(), "hello " + uid);
+
+        } finally {
+            DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
+        }
     }
 
 }
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
similarity index 74%
rename from software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
rename to software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
index 1522271150..fca932cd95 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
@@ -21,35 +21,36 @@ package org.apache.brooklyn.tasks.kubectl;
 import com.beust.jcommander.internal.Maps;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
+import java.io.File;
 import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
-public class JobBuilderTest {
-    private static final Logger LOG = LoggerFactory.getLogger(JobBuilderTest.class);
+public class KubeJobSpecCreatorTest {
+    private static final Logger LOG = LoggerFactory.getLogger(KubeJobSpecCreatorTest.class);
 
     @Test
     public void testPerlWithArgs() throws  Exception{
-        String yamlJobLocation =
-                new JobBuilder().withImage("perl").withName("perl-args-test")
+        BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+                new KubeJobFileCreator().withImage("perl").withName("perl-args-test")
                         .withArgs(Lists.newArrayList( "echo", "aaa"))
                         .withImagePullPolicy(PullPolicy.ALWAYS) // explicit "Always"
-                        .build();
+                        .createFile();
         assertNotNull(yamlJobLocation);
-        String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+        String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
         String expected = "apiVersion: batch/v1\n" +
                 "kind: Job\n" +
                 "metadata:\n" +
                 "  name: perl-args-test\n" +
                 "spec:\n" +
-                "  backoffLimit: 1\n" +
+                "  backoffLimit: 0\n" +
                 "  completions: 1\n" +
                 "  parallelism: 1\n" +
                 "  template:\n" +
@@ -68,20 +69,20 @@ public class JobBuilderTest {
 
     @Test
     public void testPerlWithArgsAndCommand() throws  Exception{
-        String yamlJobLocation =
-                new JobBuilder().withImage("perl").withName("perl-args-and-command-test")
-                        .withCommands(Lists.newArrayList("/bin/bash"))
+        BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+                new KubeJobFileCreator().withImage("perl").withName("perl-args-and-command-test")
+                        .withCommand(Lists.newArrayList("/bin/bash"))
                         .withArgs(Lists.newArrayList("-c", "echo aaa"))
                         .withImagePullPolicy(PullPolicy.NEVER)
-                        .build();
+                        .createFile();
         assertNotNull(yamlJobLocation);
-        String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+        String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
         String expected = "apiVersion: batch/v1\n" +
                 "kind: Job\n" +
                 "metadata:\n" +
                 "  name: perl-args-and-command-test\n" +
                 "spec:\n" +
-                "  backoffLimit: 1\n" +
+                "  backoffLimit: 0\n" +
                 "  completions: 1\n" +
                 "  parallelism: 1\n" +
                 "  template:\n" +
@@ -102,19 +103,19 @@ public class JobBuilderTest {
 
     @Test
     public void testPerlCommand() throws  Exception{
-        String yamlJobLocation =
-                new JobBuilder().withImage("perl").withName("perl-command-test")
-                        .withCommands(Lists.newArrayList("/bin/bash", "-c", "echo aaa"))
+        BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+                new KubeJobFileCreator().withImage("perl").withName("perl-command-test")
+                        .withCommand(Lists.newArrayList("/bin/bash", "-c", "echo aaa"))
                         .withImagePullPolicy(PullPolicy.IF_NOT_PRESENT)
-                        .build();
+                        .createFile();
         assertNotNull(yamlJobLocation);
-        String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+        String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
         String expected = "apiVersion: batch/v1\n" +
                 "kind: Job\n" +
                 "metadata:\n" +
                 "  name: perl-command-test\n" +
                 "spec:\n" +
-                "  backoffLimit: 1\n" +
+                "  backoffLimit: 0\n" +
                 "  completions: 1\n" +
                 "  parallelism: 1\n" +
                 "  template:\n" +
@@ -137,21 +138,20 @@ public class JobBuilderTest {
         Map<String,Object> volumes = Maps.newHashMap();
         volumes.put("name", "tf-ws");
         volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
-        String yamlJobLocation =
-                new JobBuilder().withImage("hashicorp/terraform").withName("tf-version")
-                        .withVolumes(Sets.newHashSet(volumes))
-                        .withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")))
-                        .withCommands(Lists.newArrayList("terraform", "version"))
-                        .withWorkingDir("/tfws/app1")
-                        .build();
+        BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation = new KubeJobFileCreator().withImage("hashicorp/terraform").withName("tf-version")
+                .withVolumes(Sets.newHashSet(volumes))
+                .withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")))
+                .withCommand(Lists.newArrayList("terraform", "version"))
+                .withWorkingDir("/tfws/app1")
+                .createFile();
         assertNotNull(yamlJobLocation);
-        String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+        String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
         String expected = "apiVersion: batch/v1\n" +
                 "kind: Job\n" +
                 "metadata:\n" +
                 "  name: tf-version\n" +
                 "spec:\n" +
-                "  backoffLimit: 1\n" +
+                "  backoffLimit: 0\n" +
                 "  completions: 1\n" +
                 "  parallelism: 1\n" +
                 "  template:\n" +