You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/16 15:53:05 UTC

[brooklyn-server] 02/04: run container tasks in background and as transient

This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 19fac6b1eb8ebcab4d319235158888fbb16f6f24
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 11:51:53 2022 +0100

    run container tasks in background and as transient
    
    avoid polluting tasks view with boring details of how container tasks need to be run
---
 .../tasks/kubectl/ContainerTaskFactory.java        | 96 +++++++++++++---------
 1 file changed, 58 insertions(+), 38 deletions(-)

diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index f1b8caf255..84cb436b4b 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInitializers;
@@ -79,6 +80,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
     private Boolean deleteNamespace;
     Function<ContainerTaskResult,RET> returnConversion;
 
+    private <T extends TaskAdaptable<?>> T runTask(Entity entity, T t, boolean block, boolean markTransient) {
+        // previously we queued all the callers of this as sub-tasks, but that bloats the kilt diagram, so use entity.submit instead, optionally with blocking.
+        // most will be transient, apart from the main flow, so that they get GC'd quicker and don't clutter the kilt
+        //DynamicTasks.queue(t);
+
+        if (markTransient) BrooklynTaskTags.setTransient(t.asTask());
+        Entities.submit(entity, t);
+        if (block) { t.asTask().blockUntilEnded(Duration.PRACTICALLY_FOREVER); }
+        return t;
+    }
+
     @Override
     public Task<RET> newTask() {
         final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
@@ -152,7 +164,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                             if (createNamespace==null) {
                                 createNsJobF.allowingNonZeroExitCode();
                             }
-                            createNsJob = DynamicTasks.queue(createNsJobF.newTask());
+                            createNsJob = runTask(entity, createNsJobF.newTask(), true, true);
                         }
 
                         // only delete if told to always, unless we successfully create it
@@ -177,13 +189,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                                 }
                             }
 
-                            DynamicTasks.queue(
-                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask());
+                            runTask(entity,
+                                    newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true);
 
                             final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
 
                             // wait for it to be running (or failed / succeeded) -
-                            PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer);
+                            PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer);
 //                            waitForContainerPodContainerState(kubeJobName, result, timer);
 
                             // notify once pod is available
@@ -198,14 +210,14 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
 
                             LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
 
-                            ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask());
-                            ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask());
+                            ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true);
+                            ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true);
 
-                            DynamicTasks.waitForLast();
                             result.mainStdout = retrieveOutput.get();
 
                             updateStdoutWithNewData(stdout, result.mainStdout);
 
+                            retrieveExitCode.get();
                             String exitCodeS = retrieveExitCode.getStdout();
                             if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
                             else result.mainExitCode = -1;
@@ -223,10 +235,12 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                             } else {
                                 Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
                                 if (!Boolean.TRUE.equals(devMode)) {
-                                    Entities.submit(entity, newDeleteJobTask(kubeJobName)
-                                                    // namespace might have been deleted in parallel so okay if we don't delete the job
-                                                    .allowingNonZeroExitCode()
-                                                    .newTask()).get();
+                                    Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName)
+                                            // namespace might have been deleted in parallel so okay if we don't delete the job;
+                                            .allowingNonZeroExitCode()
+                                            .newTask().asTask()));
+                                    // no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks
+                                    //deletion.get();
                                 }
                             }
                             DynamicTasks.waitForLast();
@@ -242,7 +256,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
     }
 
     private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+        return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
             while (true) {
                 LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
 
@@ -255,22 +269,22 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 // other one-off checks for job error, we could do here
                 // e.g. if image can't be pulled, for instance
 
-                refreshStdout(stdout, kubeJobName, timer);
+                refreshStdout(entity, stdout, kubeJobName, timer);
 
                 // probably timed out or job not yet available; short wait then retry
                 Time.sleep(Duration.millis(50));
             }
 
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
     private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+        return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
             long retryDelay = 10;
             while (true) {
                 LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
 
-                PodPhases phase = checkPodPhase(kubeJobName);
+                PodPhases phase = checkPodPhase(entity, kubeJobName);
                 if (phase.equals(PodPhases.Succeeded)) return true;
                 if (phase.equals(PodPhases.Failed)) return false;
 
@@ -279,7 +293,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 // other one-off checks for job error, we could do here
                 // e.g. if image can't be pulled, for instance
 
-                refreshStdout(stdout, kubeJobName, timer);
+                refreshStdout(entity, stdout, kubeJobName, timer);
 
                 // probably timed out or job not yet available; short wait then retry
                 Time.sleep(Duration.millis(retryDelay));
@@ -290,14 +304,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 }
             }
 
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
-    private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
+    private void refreshStdout(Entity entity, ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
         // finally get the partial log for reporting
-        ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
-        BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
-        outputSoFarCmd.block();
+        ProcessTaskWrapper<String> outputSoFarCmd = runTask(entity,
+                newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask(), true, true);
         if (outputSoFarCmd.getExitCode() != 0) {
             throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr());
         }
@@ -415,15 +428,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         return null;
     }
 
-    private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
-        return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
+    private PodPhases waitForContainerAvailable(Entity entity, String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
+        return runTask(entity, Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
             long first = System.currentTimeMillis();
             long last = first;
             long backoffMillis = 10;
             PodPhases phase = PodPhases.Unknown;
             long startupReportDelay = 1000;  // report any start longer than 1s
             while (timer.isNotExpired()) {
-                phase = checkPodPhase(kubeJobName);
+                phase = checkPodPhase(entity, kubeJobName);
                 if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) {
                     if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
                     else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
@@ -431,13 +444,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 }
 
                 if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) {
-                    result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
+                    result.kubePodName = runTask(entity, newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask(), false, true).get().trim();
                 }
                 if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) {
                     // if pending, need to look for errors
-                    String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
+                    String failedEvents = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask(),
+                            false, true).get().trim();
                     if (!"[]".equals(failedEvents)) {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask(),
+                                false, false).get().trim();
                         throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
                     }
                 }
@@ -450,15 +465,18 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                     Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info;
 
                     log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'");
-                    String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+                    String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+                            false, true).get().trim();
                     if (Strings.isNonBlank(stateJsonS)) {
                         log.accept("Pod state: "+stateJsonS);
                     }
                     if (Strings.isNonBlank(result.kubePodName)) {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask(),
+                                false, true).get().trim();
                         log.accept("Pod events: \n"+events);
                     } else {
-                        String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
+                        String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask(),
+                                false, true).get().trim();
                         log.accept("Job events: \n"+events);
                     }
 
@@ -474,24 +492,25 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
                 if (backoffMillis<80) backoffMillis*=2;
             }
             throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
-        }).build()).getUnchecked();
+        }).build(), false, true).getUnchecked();
     }
 
-    private PodPhases checkPodPhase(String kubeJobName) {
-        PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName);
+    private PodPhases checkPodPhase(Entity entity, String kubeJobName) {
+        PodPhases succeeded = getPodPhaseFromContainerState(entity, kubeJobName);
         if (succeeded != null) return succeeded;
 
         // this is the more official way, fall back to it if above is not recognised (eg waiting)
-        String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
+        String phase = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask(), false, true).get().trim();
         for (PodPhases candidate: PodPhases.values()) {
             if (candidate.name().equalsIgnoreCase(phase)) return candidate;
         }
         return PodPhases.Unknown;
     }
 
-    private PodPhases getPodPhaseFromContainerState(String kubeJobName) {
+    private PodPhases getPodPhaseFromContainerState(Entity entity, String kubeJobName) {
         // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it
-        String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+        String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+                false, true).get().trim();
         if (Strings.isNonBlank(stateJsonS)) {
             Object stateO = new Gson().fromJson(stateJsonS, Object.class);
             if (stateO instanceof Map) {
@@ -564,7 +583,8 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
         ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
         if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
         else tf = tf.requiringExitCodeZero();
-        ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask());
+        ProcessTaskWrapper<String> task = tf.newTask();
+        Entities.submit(entity, BrooklynTaskTags.setTransient(task.asTask()));
         if (wait) {
             task.get();
             LOG.info("Deleted namespace " + namespace);