You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/11 23:55:07 UTC

[brooklyn-server] 08/09: faster strategies for determining container readiness and completion

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 613bd023bd65d5e74fc0fc33ea82b8e3263ae5f3
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Thu Aug 11 22:23:00 2022 +0100

    faster strategies for determining container readiness and completion
    
    and destroy namespace can be done asynchronously
    
    because jobs/pods update only several seconds after the container is finished (using docker desktop)
---
 .../brooklyn/tasks/kubectl/ContainerCommons.java   |   6 +-
 .../tasks/kubectl/ContainerTaskFactory.java        | 456 ++++++++++++++-------
 .../brooklyn/tasks/kubectl/ContainerTaskTest.java  |   9 +-
 3 files changed, 328 insertions(+), 143 deletions(-)

diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index d426724e38..130c25b963 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -56,10 +56,12 @@ public interface ContainerCommons {
     String NAMESPACE_CREATE_CMD = "kubectl create namespace %s";
     String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=%s";
     String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s";
-    String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
-    String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
+    String JOBS_WAIT_COMPLETE_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
+    String JOBS_WAIT_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
     String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s";
+    String JOBS_DELETE_CMD = "kubectl delete job %s --namespace=%s";
     String PODS_CMD_PREFIX = "kubectl get pods --namespace=%s --selector=job-name=%s ";
+    String PODS_STATUS_STATE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state}'";
     String PODS_STATUS_PHASE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.phase}'";
     String PODS_NAME_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].metadata.name}'";
     String PODS_EXIT_CODE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'";
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index a78c1cb6de..f1b8caf255 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.tasks.kubectl;
 
 import com.google.common.collect.Lists;
+import com.google.gson.Gson;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
@@ -52,12 +53,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -71,6 +74,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
     protected String jobIdentifier = "";
     protected final ConfigBag config = ConfigBag.newInstance();
     private String namespace;
+    private boolean namespaceRandom = false;
     private Boolean createNamespace;
     private Boolean deleteNamespace;
     Function<ContainerTaskResult,RET> returnConversion;
@@ -179,146 +183,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                             final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
 
                             // wait for it to be running (or failed / succeeded) -
-                            PodPhases podPhase = DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
-                                String phase = null;
-                                long first = System.currentTimeMillis();
-                                long last = first;
-                                long backoffMillis = 10;
-                                while (timer.isNotExpired()) {
-                                    phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
-                                    if (PodPhases.Running.name().equalsIgnoreCase(phase)) return PodPhases.Running;
-                                    if (PodPhases.Failed.name().equalsIgnoreCase(phase)) return PodPhases.Failed;
-                                    if (PodPhases.Succeeded.name().equalsIgnoreCase(phase)) return PodPhases.Succeeded;
-
-                                    if (Strings.isNonBlank(phase) && Strings.isBlank(result.kubePodName)) {
-                                        result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
-                                    }
-                                    if (PodPhases.Pending.name().equals(phase) && Strings.isNonBlank(result.kubePodName)) {
-                                        // if pending, look for errors
-                                        String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Get pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
-                                        if (!"[]".equals(failedEvents)) {
-                                            String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
-                                            throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
-                                        }
-                                    }
-
-                                    if (System.currentTimeMillis() - last > 10*1000) {
-                                        last = System.currentTimeMillis();
-                                        // every 10s log info
-                                        LOG.info("Container taking long time to start ("+Duration.millis(last-first)+"): "+namespace+" "+kubeJobName+" "+result.kubePodName+" / phase '"+phase+"'");
-                                        if (Strings.isNonBlank(result.kubePodName)) {
-                                            String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
-                                            LOG.info("Pod events: \n"+events);
-                                        } else {
-                                            String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
-                                            LOG.info("Job events: \n"+events);
-                                        }
-                                    }
-                                    long backoffMillis2 = backoffMillis;
-                                    Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> {
-                                        Time.sleep(backoffMillis2);
-                                        return null;
-                                    });
-                                    if (backoffMillis<80) backoffMillis*=2;
-                                }
-                                throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
-                            }).build()).getUnchecked();
+                            PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer);
+//                            waitForContainerPodContainerState(kubeJobName, result, timer);
 
                             // notify once pod is available
-                            synchronized (result) {
-                                result.notifyAll();
-                            }
-
-                            // use `wait --for` api, but in a 5s loop in case there are other issues
-                            boolean succeeded = podPhase == PodPhases.Succeeded ? true : podPhase == PodPhases.Failed ? false : DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
-                                while (true) {
-                                    LOG.debug("Container job submitted, now waiting on success or failure");
-
-                                    long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
-                                    final AtomicInteger finishCount = new AtomicInteger(0);
-
-                                    ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD, secondsLeft, kubeJobName, namespace))
-                                            .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
-                                    Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
-                                        try {
-                                            if (waitForSuccess.get().contains("condition met"))
-                                                LOG.debug("Container job " + namespace + " detected as completed (succeeded) in kubernetes");
-                                        } finally {
-                                            synchronized (finishCount) {
-                                                finishCount.incrementAndGet();
-                                                finishCount.notifyAll();
-                                            }
-                                        }
-                                    }));
-
-                                    ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft, kubeJobName, namespace))
-                                            .summary("Wait for failed").allowingNonZeroExitCode().newTask());
-                                    Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
-                                        try {
-                                            if (waitForFailed.get().contains("condition met"))
-                                                LOG.debug("Container job " + namespace + " detected as failed in kubernetes (may be valid non-zero exit)");
-                                        } finally {
-                                            synchronized (finishCount) {
-                                                finishCount.incrementAndGet();
-                                                finishCount.notifyAll();
-                                            }
-                                        }
-                                    }));
-
-                                    while (finishCount.get() == 0) {
-                                        LOG.debug("Container job " + namespace + " waiting on complete or failed");
-                                        try {
-                                            synchronized (finishCount) {
-                                                finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
-                                            }
-                                        } catch (InterruptedException e) {
-                                            throw Exceptions.propagate(e);
-                                        }
-                                    }
-
-                                    if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
-                                    if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
-                                    LOG.debug("Container job " + namespace + " not yet complete, will retry");
+                            synchronized (result) { result.notifyAll(); }
 
-                                    // other one-off checks for job error, we could do here
-                                    // e.g. if image can't be pulled, for instance
+                            boolean succeeded = PodPhases.Succeeded == phaseOnceActive ||
+                                    (PodPhases.Failed != phaseOnceActive &&
+                                            //use `wait --for` api, but in a 5s loop in case there are other issues
+//                                            waitForContainerCompletedUsingK8sWaitFor(stdout, kubeJobName, entity, timer)
+                                            waitForContainerCompletedUsingPodState(stdout, kubeJobName, entity, timer)
+                                    );
 
-                                    // finally get the partial log for reporting
-                                    ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
-                                    BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
-                                    outputSoFarCmd.block();
-                                    if (outputSoFarCmd.getExitCode()!=0) {
-                                        throw new IllegalStateException("Error detected with container job while reading logs (exit code "+outputSoFarCmd.getExitCode()+"): "+outputSoFarCmd.getStdout() + " / "+outputSoFarCmd.getStderr());
-                                    }
-                                    String outputSoFar = outputSoFarCmd.get();
-                                    int bytesAlreadyRead = stdout.size();
-                                    if (bytesAlreadyRead <= outputSoFar.length()) {
-                                        String newOutput = outputSoFar.substring(stdout.size());
-                                        LOG.debug("Container job " + namespace + " output: " + newOutput);
-                                        stdout.write(newOutput.getBytes(StandardCharsets.UTF_8));
-                                    } else {
-                                        // not sure why this happens, but it does sometimes; for now just reset
-                                        LOG.debug("Container job " + namespace + " output reset, length "+outputSoFar.length()+" less than "+bytesAlreadyRead+"; ignoring new output:\n" + outputSoFar +"\n"+new String(stdout.toByteArray()));
-                                        stdout.reset();
-                                        stdout.write(outputSoFar.getBytes(StandardCharsets.UTF_8));
-                                    }
-
-                                    if (timer.isExpired())
-                                        throw new IllegalStateException("Timeout waiting for success or failure");
-
-                                    // probably timed out or job not yet available; short wait then retry
-                                    Time.sleep(Duration.millis(50));
-                                }
-
-                            }).build()).getUnchecked();
-                            LOG.debug("Container job "+namespace+" completed, success "+succeeded);
+                            LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
 
                             ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask());
                             ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask());
 
                             DynamicTasks.waitForLast();
                             result.mainStdout = retrieveOutput.get();
-                            stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8));
+
+                            updateStdoutWithNewData(stdout, result.mainStdout);
 
                             String exitCodeS = retrieveExitCode.getStdout();
                             if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
@@ -333,8 +219,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
 
                         } finally {
                             if (deleteNamespaceHere) {
-                                doDeleteNamespace();
+                                doDeleteNamespace(!namespaceRandom, true);  // if a one-off job, namespace has random id in it so can safely be deleted in background (no one else risks reusing it)
+                            } else {
+                                Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
+                                if (!Boolean.TRUE.equals(devMode)) {
+                                    Entities.submit(entity, newDeleteJobTask(kubeJobName)
+                                                    // namespace might have been deleted in parallel so okay if we don't delete the job
+                                                    .allowingNonZeroExitCode()
+                                                    .newTask()).get();
+                                }
                             }
+                            DynamicTasks.waitForLast();
                         }
                     } catch (Exception e) {
                         throw Exceptions.propagate(e);
@@ -346,6 +241,277 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return taskBuilder.build();
     }
 
+    private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
+        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+            while (true) {
+                LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
+
+                long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
+                Boolean x = checkForContainerCompletedUsingK8sWaitFor(kubeJobName, entity, secondsLeft);
+
+                if (x != null) return x;
+                LOG.debug("Container job " + namespace + " not yet complete, will retry");
+
+                // other one-off checks for job error, we could do here
+                // e.g. if image can't be pulled, for instance
+
+                refreshStdout(stdout, kubeJobName, timer);
+
+                // probably timed out or job not yet available; short wait then retry
+                Time.sleep(Duration.millis(50));
+            }
+
+        }).build()).getUnchecked();
+    }
+
+    private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
+        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+            long retryDelay = 10;
+            while (true) {
+                LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
+
+                PodPhases phase = checkPodPhase(kubeJobName);
+                if (phase.equals(PodPhases.Succeeded)) return true;
+                if (phase.equals(PodPhases.Failed)) return false;
+
+                LOG.debug("Container job " + namespace + " not yet complete, will sleep then retry");
+
+                // other one-off checks for job error, we could do here
+                // e.g. if image can't be pulled, for instance
+
+                refreshStdout(stdout, kubeJobName, timer);
+
+                // probably timed out or job not yet available; short wait then retry
+                Time.sleep(Duration.millis(retryDelay));
+                retryDelay *= 1.5;
+                if (retryDelay > 250) {
+                    // max out at 500ms
+                    retryDelay = 500;
+                }
+            }
+
+        }).build()).getUnchecked();
+    }
+
+    private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
+        // finally get the partial log for reporting
+        ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
+        BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
+        outputSoFarCmd.block();
+        if (outputSoFarCmd.getExitCode() != 0) {
+            throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr());
+        }
+        updateStdoutWithNewData(stdout, outputSoFarCmd.get());
+
+        if (timer.isExpired())
+            throw new IllegalStateException("Timeout waiting for success or failure");
+    }
+
+    private void updateStdoutWithNewData(ByteArrayOutputStream receiverStream, String outputFound) throws IOException {
+        int bytesAlreadyRead = receiverStream.size();
+        if (bytesAlreadyRead <= outputFound.length()) {
+            String newOutput = outputFound.substring(receiverStream.size());
+            LOG.debug("Container job " + namespace + " output: " + newOutput);
+            receiverStream.write(newOutput.getBytes(StandardCharsets.UTF_8));
+        } else {
+            // not sure why this happens, but it does sometimes; for now just reset
+            LOG.debug("Container job " + namespace + " output reset, length " + outputFound.length() + " less than " + bytesAlreadyRead + "; ignoring new output:\n" + outputFound + "\n" + new String(receiverStream.toByteArray()));
+            receiverStream.reset();
+            receiverStream.write(outputFound.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+
+    private Boolean checkForContainerCompletedUsingK8sWaitFor(String kubeJobName, Entity entity, long timeoutSeconds) {
+        final AtomicInteger finishCount = new AtomicInteger(0);
+
+        ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace))
+                .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+        Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+            try {
+                if (waitForSuccess.get().contains("condition met"))
+                    LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes");
+            } finally {
+                synchronized (finishCount) {
+                    finishCount.incrementAndGet();
+                    finishCount.notifyAll();
+                }
+            }
+        }));
+
+        ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace))
+                .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+        Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+            try {
+                if (waitForFailed.get().contains("condition met"))
+                    LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)");
+            } finally {
+                synchronized (finishCount) {
+                    finishCount.incrementAndGet();
+                    finishCount.notifyAll();
+                }
+            }
+        }));
+
+        while (finishCount.get() == 0) {
+            LOG.debug("Container job " + kubeJobName + " waiting on complete or failed");
+            try {
+                synchronized (finishCount) {
+                    finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+                }
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+        }
+
+        if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+        if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+        return null;
+    }
+
+    private Boolean checkForContainerCompletedUsingPodState(String kubeJobName, Entity entity, long timeoutSeconds) {
+        final AtomicInteger finishCount = new AtomicInteger(0);
+
+        ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace))
+                .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+        Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+            try {
+                if (waitForSuccess.get().contains("condition met"))
+                    LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes");
+            } finally {
+                synchronized (finishCount) {
+                    finishCount.incrementAndGet();
+                    finishCount.notifyAll();
+                }
+            }
+        }));
+
+        ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace))
+                .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+        Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+            try {
+                if (waitForFailed.get().contains("condition met"))
+                    LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)");
+            } finally {
+                synchronized (finishCount) {
+                    finishCount.incrementAndGet();
+                    finishCount.notifyAll();
+                }
+            }
+        }));
+
+        while (finishCount.get() == 0) {
+            LOG.debug("Container job " + kubeJobName + " waiting on complete or failed");
+            try {
+                synchronized (finishCount) {
+                    finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+                }
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+        }
+
+        if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+        if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+        return null;
+    }
+
+    private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
+        return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
+            long first = System.currentTimeMillis();
+            long last = first;
+            long backoffMillis = 10;
+            PodPhases phase = PodPhases.Unknown;
+            long startupReportDelay = 1000;  // report any start longer than 1s
+            while (timer.isNotExpired()) {
+                phase = checkPodPhase(kubeJobName);
+                if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) {
+                    if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
+                    else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
+                    return phase;
+                }
+
+                if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) {
+                    result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
+                }
+                if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) {
+                    // if pending, need to look for errors
+                    String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
+                    if (!"[]".equals(failedEvents)) {
+                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim();
+                        throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
+                    }
+                }
+
+                if (System.currentTimeMillis() - last > startupReportDelay) {
+                    last = System.currentTimeMillis();
+
+                    // log debug after 1s, then info after 5s, 20s, etc
+                    // seems bad that it often takes 1s+ just to start the container :/
+                    Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info;
+
+                    log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'");
+                    String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+                    if (Strings.isNonBlank(stateJsonS)) {
+                        log.accept("Pod state: "+stateJsonS);
+                    }
+                    if (Strings.isNonBlank(result.kubePodName)) {
+                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
+                        log.accept("Pod events: \n"+events);
+                    } else {
+                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
+                        log.accept("Job events: \n"+events);
+                    }
+
+                    // first 1s, then 5s, then every 20s
+                    startupReportDelay *= 5;
+                    if (startupReportDelay > 20*1000) startupReportDelay = 20*1000;
+                }
+                long backoffMillis2 = backoffMillis;
+                Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> {
+                    Time.sleep(backoffMillis2);
+                    return null;
+                });
+                if (backoffMillis<80) backoffMillis*=2;
+            }
+            throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
+        }).build()).getUnchecked();
+    }
+
+    private PodPhases checkPodPhase(String kubeJobName) {
+        PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName);
+        if (succeeded != null) return succeeded;
+
+        // this is the more official way, fall back to it if above is not recognised (eg waiting)
+        String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
+        for (PodPhases candidate: PodPhases.values()) {
+            if (candidate.name().equalsIgnoreCase(phase)) return candidate;
+        }
+        return PodPhases.Unknown;
+    }
+
+    private PodPhases getPodPhaseFromContainerState(String kubeJobName) {
+        // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it
+        String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+        if (Strings.isNonBlank(stateJsonS)) {
+            Object stateO = new Gson().fromJson(stateJsonS, Object.class);
+            if (stateO instanceof Map) {
+                if (!((Map<?, ?>) stateO).keySet().isEmpty()) {
+                    Object stateK = (((Map<?, ?>) stateO).keySet().iterator().next());
+                    if (stateK instanceof String) {
+                        String stateS = (String) stateK;
+                        if ("terminated".equalsIgnoreCase(stateS)) return PodPhases.Succeeded;
+                        if ("running".equalsIgnoreCase(stateS)) return PodPhases.Running;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public ProcessTaskFactory<String> newDeleteJobTask(String kubeJobName) {
+        return newSimpleTaskFactory(String.format(JOBS_DELETE_CMD, kubeJobName, namespace)).summary("Delete job");
+    }
+
     private String initNamespaceAndGetNewJobName() {
         Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
         if (entity == null) {
@@ -374,6 +540,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 .toLowerCase();
         if (namespace==null) {
             namespace = kubeJobName;
+            namespaceRandom = true;
         }
         return kubeJobName;
     }
@@ -382,21 +549,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return namespace;
     }
 
-    public boolean doDeleteNamespace() {
-        if (namespace==null) return false;
+    public ProcessTaskWrapper<String> doDeleteNamespace(boolean wait, boolean requireSuccess) {
+        if (namespace==null) return null;
         Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
-        if (entity==null) return false;
+        if (entity==null) return null;
         // clean up - delete namespace
         Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
         if (Boolean.TRUE.equals(devMode)) {
-            return false;
+            return null;
         }
 
-        LOG.debug("Deleting namespace " + namespace);
+        LOG.info("Deleting namespace " + namespace);
         // do this not as a subtask so we can run even if the main queue fails
-        Entities.submit(entity, newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").newTask()).block();
-        System.runFinalization();
-        return true;
+        ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
+        if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
+        else tf = tf.requiringExitCodeZero();
+        ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask());
+        if (wait) {
+            task.get();
+            LOG.info("Deleted namespace " + namespace);
+            System.runFinalization();
+        }
+        return task;
     }
 
     public T summary(String summary) { this.summary = summary; return self(); }
@@ -458,7 +632,9 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return self();
     }
 
-    public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete; return self(); }
+    public T setDeleteNamespaceAfter(Boolean delete) { this.deleteNamespace = delete; return self(); }
+    @Deprecated /** @deprecated since 1.1 when introduced */
+    public T deleteNamespace(Boolean delete) { return setDeleteNamespaceAfter(delete); }
 
     /** visible in the container environment */
     public T jobIdentifier(String jobIdentifier) {
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
index a688ae21b3..0b85dcfb1e 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
@@ -30,6 +30,8 @@ import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.text.Identifiers;
 import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
@@ -43,10 +45,14 @@ import static org.testng.AssertJUnit.assertTrue;
 @Test(groups = {"Live"})
 public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ContainerTaskTest.class);
+
     @Test
     public void testSuccessfulContainerTask() {
+        LOG.info("Starting container test");
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
 
+        LOG.info("Starting dedicated container run");
         Task<ContainerTaskResult> containerTask =  ContainerTaskFactory.newInstance()
                 .summary("Running container task")
                 .jobIdentifier("test-container-task")
@@ -58,6 +64,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
 
         DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
         ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+        LOG.info("Result: "+result + " / "+result.getMainStdout().trim());
         Asserts.assertEquals(result.getMainStdout().trim(), "hello test");
     }
 
@@ -220,7 +227,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
             Asserts.assertEquals(result.getMainStdout().trim(), "hello " + uid);
 
         } finally {
-            DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
+            DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").setDeleteNamespaceAfter(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
         }
     }