You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/16 15:53:05 UTC
[brooklyn-server] 02/04: run container tasks in background and as transient
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 19fac6b1eb8ebcab4d319235158888fbb16f6f24
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Aug 16 11:51:53 2022 +0100
run container tasks in background and as transient
avoid polluting tasks view with boring details of how container tasks need to be run
---
.../tasks/kubectl/ContainerTaskFactory.java | 96 +++++++++++++---------
1 file changed, 58 insertions(+), 38 deletions(-)
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index f1b8caf255..84cb436b4b 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInitializers;
@@ -79,6 +80,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
private Boolean deleteNamespace;
Function<ContainerTaskResult,RET> returnConversion;
+ private <T extends TaskAdaptable<?>> T runTask(Entity entity, T t, boolean block, boolean markTransient) {
+ // previously we queued all the callers of this as sub-tasks, but that bloats the kilt diagram, so use entity.submit instead, optionally with blocking.
+ // most will be transient, apart from the main flow, so that they get GC'd quicker and don't clutter the kilt
+ //DynamicTasks.queue(t);
+
+ if (markTransient) BrooklynTaskTags.setTransient(t.asTask());
+ Entities.submit(entity, t);
+ if (block) { t.asTask().blockUntilEnded(Duration.PRACTICALLY_FOREVER); }
+ return t;
+ }
+
@Override
public Task<RET> newTask() {
final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
@@ -152,7 +164,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
if (createNamespace==null) {
createNsJobF.allowingNonZeroExitCode();
}
- createNsJob = DynamicTasks.queue(createNsJobF.newTask());
+ createNsJob = runTask(entity, createNsJobF.newTask(), true, true);
}
// only delete if told to always, unless we successfully create it
@@ -177,13 +189,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
}
}
- DynamicTasks.queue(
- newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask());
+ runTask(entity,
+ newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask(), true, true);
final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
// wait for it to be running (or failed / succeeded) -
- PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer);
+ PodPhases phaseOnceActive = waitForContainerAvailable(entity, kubeJobName, result, timer);
// waitForContainerPodContainerState(kubeJobName, result, timer);
// notify once pod is available
@@ -198,14 +210,14 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
- ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask());
- ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask());
+ ProcessTaskWrapper<String> retrieveOutput = runTask(entity, newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask(), false, true);
+ ProcessTaskWrapper<String> retrieveExitCode = runTask(entity, newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask(), false, true);
- DynamicTasks.waitForLast();
result.mainStdout = retrieveOutput.get();
updateStdoutWithNewData(stdout, result.mainStdout);
+ retrieveExitCode.get();
String exitCodeS = retrieveExitCode.getStdout();
if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
else result.mainExitCode = -1;
@@ -223,10 +235,12 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
} else {
Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
if (!Boolean.TRUE.equals(devMode)) {
- Entities.submit(entity, newDeleteJobTask(kubeJobName)
- // namespace might have been deleted in parallel so okay if we don't delete the job
- .allowingNonZeroExitCode()
- .newTask()).get();
+ Task<String> deletion = Entities.submit(entity, BrooklynTaskTags.setTransient(newDeleteJobTask(kubeJobName)
+ // namespace might have been deleted in parallel so okay if we don't delete the job;
+ .allowingNonZeroExitCode()
+ .newTask().asTask()));
+ // no big deal if not deleted, job ID will always be unique, so allow to delete in background and not block subsequent tasks
+ //deletion.get();
}
}
DynamicTasks.waitForLast();
@@ -242,7 +256,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
}
private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
- return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+ return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
while (true) {
LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
@@ -255,22 +269,22 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
// other one-off checks for job error, we could do here
// e.g. if image can't be pulled, for instance
- refreshStdout(stdout, kubeJobName, timer);
+ refreshStdout(entity, stdout, kubeJobName, timer);
// probably timed out or job not yet available; short wait then retry
Time.sleep(Duration.millis(50));
}
- }).build()).getUnchecked();
+ }).build(), false, true).getUnchecked();
}
private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
- return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+ return runTask(entity, Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
long retryDelay = 10;
while (true) {
LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
- PodPhases phase = checkPodPhase(kubeJobName);
+ PodPhases phase = checkPodPhase(entity, kubeJobName);
if (phase.equals(PodPhases.Succeeded)) return true;
if (phase.equals(PodPhases.Failed)) return false;
@@ -279,7 +293,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
// other one-off checks for job error, we could do here
// e.g. if image can't be pulled, for instance
- refreshStdout(stdout, kubeJobName, timer);
+ refreshStdout(entity, stdout, kubeJobName, timer);
// probably timed out or job not yet available; short wait then retry
Time.sleep(Duration.millis(retryDelay));
@@ -290,14 +304,13 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
}
}
- }).build()).getUnchecked();
+ }).build(), false, true).getUnchecked();
}
- private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
+ private void refreshStdout(Entity entity, ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
// finally get the partial log for reporting
- ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
- BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
- outputSoFarCmd.block();
+ ProcessTaskWrapper<String> outputSoFarCmd = runTask(entity,
+ newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask(), true, true);
if (outputSoFarCmd.getExitCode() != 0) {
throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr());
}
@@ -415,15 +428,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
return null;
}
- private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
- return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
+ private PodPhases waitForContainerAvailable(Entity entity, String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
+ return runTask(entity, Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
long first = System.currentTimeMillis();
long last = first;
long backoffMillis = 10;
PodPhases phase = PodPhases.Unknown;
long startupReportDelay = 1000; // report any start longer than 1s
while (timer.isNotExpired()) {
- phase = checkPodPhase(kubeJobName);
+ phase = checkPodPhase(entity, kubeJobName);
if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) {
if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
@@ -431,13 +444,15 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
}
if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) {
- result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
+ result.kubePodName = runTask(entity, newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask(), false, true).get().trim();
}
if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) {
// if pending, need to look for errors
- String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
+ String failedEvents = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask(),
+ false, true).get().trim();
if (!"[]".equals(failedEvents)) {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim();
+ String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask(),
+ false, false).get().trim();
throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
}
}
@@ -450,15 +465,18 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info;
log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'");
- String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+ String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+ false, true).get().trim();
if (Strings.isNonBlank(stateJsonS)) {
log.accept("Pod state: "+stateJsonS);
}
if (Strings.isNonBlank(result.kubePodName)) {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
+ String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask(),
+ false, true).get().trim();
log.accept("Pod events: \n"+events);
} else {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
+ String events = runTask(entity, newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask(),
+ false, true).get().trim();
log.accept("Job events: \n"+events);
}
@@ -474,24 +492,25 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
if (backoffMillis<80) backoffMillis*=2;
}
throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
- }).build()).getUnchecked();
+ }).build(), false, true).getUnchecked();
}
- private PodPhases checkPodPhase(String kubeJobName) {
- PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName);
+ private PodPhases checkPodPhase(Entity entity, String kubeJobName) {
+ PodPhases succeeded = getPodPhaseFromContainerState(entity, kubeJobName);
if (succeeded != null) return succeeded;
// this is the more official way, fall back to it if above is not recognised (eg waiting)
- String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
+ String phase = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask(), false, true).get().trim();
for (PodPhases candidate: PodPhases.values()) {
if (candidate.name().equalsIgnoreCase(phase)) return candidate;
}
return PodPhases.Unknown;
}
- private PodPhases getPodPhaseFromContainerState(String kubeJobName) {
+ private PodPhases getPodPhaseFromContainerState(Entity entity, String kubeJobName) {
// pod container state is populated much sooner than the pod status and job fields and wait, so prefer it
- String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+ String stateJsonS = runTask(entity, newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask(),
+ false, true).get().trim();
if (Strings.isNonBlank(stateJsonS)) {
Object stateO = new Gson().fromJson(stateJsonS, Object.class);
if (stateO instanceof Map) {
@@ -564,7 +583,8 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
else tf = tf.requiringExitCodeZero();
- ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask());
+ ProcessTaskWrapper<String> task = tf.newTask();
+ Entities.submit(entity, BrooklynTaskTags.setTransient(task.asTask()));
if (wait) {
task.get();
LOG.info("Deleted namespace " + namespace);