You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/08/11 23:55:07 UTC
[brooklyn-server] 08/09: faster strategies for determining container readiness and completion
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 613bd023bd65d5e74fc0fc33ea82b8e3263ae5f3
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Thu Aug 11 22:23:00 2022 +0100
faster strategies for determining container readiness and completion
and destroy namespace can be done asynchronously
because jobs/pods update only several seconds after the container is finished (using docker desktop)
---
.../brooklyn/tasks/kubectl/ContainerCommons.java | 6 +-
.../tasks/kubectl/ContainerTaskFactory.java | 456 ++++++++++++++-------
.../brooklyn/tasks/kubectl/ContainerTaskTest.java | 9 +-
3 files changed, 328 insertions(+), 143 deletions(-)
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index d426724e38..130c25b963 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -56,10 +56,12 @@ public interface ContainerCommons {
String NAMESPACE_CREATE_CMD = "kubectl create namespace %s";
String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=%s";
String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s";
- String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
- String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
+ String JOBS_WAIT_COMPLETE_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
+ String JOBS_WAIT_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s";
+ String JOBS_DELETE_CMD = "kubectl delete job %s --namespace=%s";
String PODS_CMD_PREFIX = "kubectl get pods --namespace=%s --selector=job-name=%s ";
+ String PODS_STATUS_STATE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state}'";
String PODS_STATUS_PHASE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.phase}'";
String PODS_NAME_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].metadata.name}'";
String PODS_EXIT_CODE_CMD = PODS_CMD_PREFIX + "-ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'";
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index a78c1cb6de..f1b8caf255 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -19,6 +19,7 @@
package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
+import com.google.gson.Gson;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
@@ -52,12 +53,14 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -71,6 +74,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
protected String jobIdentifier = "";
protected final ConfigBag config = ConfigBag.newInstance();
private String namespace;
+ private boolean namespaceRandom = false;
private Boolean createNamespace;
private Boolean deleteNamespace;
Function<ContainerTaskResult,RET> returnConversion;
@@ -179,146 +183,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
// wait for it to be running (or failed / succeeded) -
- PodPhases podPhase = DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
- String phase = null;
- long first = System.currentTimeMillis();
- long last = first;
- long backoffMillis = 10;
- while (timer.isNotExpired()) {
- phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
- if (PodPhases.Running.name().equalsIgnoreCase(phase)) return PodPhases.Running;
- if (PodPhases.Failed.name().equalsIgnoreCase(phase)) return PodPhases.Failed;
- if (PodPhases.Succeeded.name().equalsIgnoreCase(phase)) return PodPhases.Succeeded;
-
- if (Strings.isNonBlank(phase) && Strings.isBlank(result.kubePodName)) {
- result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
- }
- if (PodPhases.Pending.name().equals(phase) && Strings.isNonBlank(result.kubePodName)) {
- // if pending, look for errors
- String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Get pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
- if (!"[]".equals(failedEvents)) {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
- throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
- }
- }
-
- if (System.currentTimeMillis() - last > 10*1000) {
- last = System.currentTimeMillis();
- // every 10s log info
- LOG.info("Container taking long time to start ("+Duration.millis(last-first)+"): "+namespace+" "+kubeJobName+" "+result.kubePodName+" / phase '"+phase+"'");
- if (Strings.isNonBlank(result.kubePodName)) {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
- LOG.info("Pod events: \n"+events);
- } else {
- String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
- LOG.info("Job events: \n"+events);
- }
- }
- long backoffMillis2 = backoffMillis;
- Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> {
- Time.sleep(backoffMillis2);
- return null;
- });
- if (backoffMillis<80) backoffMillis*=2;
- }
- throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
- }).build()).getUnchecked();
+ PodPhases phaseOnceActive = waitForContainerAvailable(kubeJobName, result, timer);
+// waitForContainerPodContainerState(kubeJobName, result, timer);
// notify once pod is available
- synchronized (result) {
- result.notifyAll();
- }
-
- // use `wait --for` api, but in a 5s loop in case there are other issues
- boolean succeeded = podPhase == PodPhases.Succeeded ? true : podPhase == PodPhases.Failed ? false : DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
- while (true) {
- LOG.debug("Container job submitted, now waiting on success or failure");
-
- long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
- final AtomicInteger finishCount = new AtomicInteger(0);
-
- ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD, secondsLeft, kubeJobName, namespace))
- .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
- Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
- try {
- if (waitForSuccess.get().contains("condition met"))
- LOG.debug("Container job " + namespace + " detected as completed (succeeded) in kubernetes");
- } finally {
- synchronized (finishCount) {
- finishCount.incrementAndGet();
- finishCount.notifyAll();
- }
- }
- }));
-
- ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft, kubeJobName, namespace))
- .summary("Wait for failed").allowingNonZeroExitCode().newTask());
- Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
- try {
- if (waitForFailed.get().contains("condition met"))
- LOG.debug("Container job " + namespace + " detected as failed in kubernetes (may be valid non-zero exit)");
- } finally {
- synchronized (finishCount) {
- finishCount.incrementAndGet();
- finishCount.notifyAll();
- }
- }
- }));
-
- while (finishCount.get() == 0) {
- LOG.debug("Container job " + namespace + " waiting on complete or failed");
- try {
- synchronized (finishCount) {
- finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
- }
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- }
-
- if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
- if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
- LOG.debug("Container job " + namespace + " not yet complete, will retry");
+ synchronized (result) { result.notifyAll(); }
- // other one-off checks for job error, we could do here
- // e.g. if image can't be pulled, for instance
+ boolean succeeded = PodPhases.Succeeded == phaseOnceActive ||
+ (PodPhases.Failed != phaseOnceActive &&
+ //use `wait --for` api, but in a 5s loop in case there are other issues
+// waitForContainerCompletedUsingK8sWaitFor(stdout, kubeJobName, entity, timer)
+ waitForContainerCompletedUsingPodState(stdout, kubeJobName, entity, timer)
+ );
- // finally get the partial log for reporting
- ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
- BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
- outputSoFarCmd.block();
- if (outputSoFarCmd.getExitCode()!=0) {
- throw new IllegalStateException("Error detected with container job while reading logs (exit code "+outputSoFarCmd.getExitCode()+"): "+outputSoFarCmd.getStdout() + " / "+outputSoFarCmd.getStderr());
- }
- String outputSoFar = outputSoFarCmd.get();
- int bytesAlreadyRead = stdout.size();
- if (bytesAlreadyRead <= outputSoFar.length()) {
- String newOutput = outputSoFar.substring(stdout.size());
- LOG.debug("Container job " + namespace + " output: " + newOutput);
- stdout.write(newOutput.getBytes(StandardCharsets.UTF_8));
- } else {
- // not sure why this happens, but it does sometimes; for now just reset
- LOG.debug("Container job " + namespace + " output reset, length "+outputSoFar.length()+" less than "+bytesAlreadyRead+"; ignoring new output:\n" + outputSoFar +"\n"+new String(stdout.toByteArray()));
- stdout.reset();
- stdout.write(outputSoFar.getBytes(StandardCharsets.UTF_8));
- }
-
- if (timer.isExpired())
- throw new IllegalStateException("Timeout waiting for success or failure");
-
- // probably timed out or job not yet available; short wait then retry
- Time.sleep(Duration.millis(50));
- }
-
- }).build()).getUnchecked();
- LOG.debug("Container job "+namespace+" completed, success "+succeeded);
+ LOG.debug("Container job "+kubeJobName+" completed, success "+succeeded);
ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output").newTask());
ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace, kubeJobName)).summary("Retrieve exit code").newTask());
DynamicTasks.waitForLast();
result.mainStdout = retrieveOutput.get();
- stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8));
+
+ updateStdoutWithNewData(stdout, result.mainStdout);
String exitCodeS = retrieveExitCode.getStdout();
if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
@@ -333,8 +219,17 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
} finally {
if (deleteNamespaceHere) {
- doDeleteNamespace();
+ doDeleteNamespace(!namespaceRandom, true); // if a one-off job, namespace has random id in it so can safely be deleted in background (no one else risks reusing it)
+ } else {
+ Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
+ if (!Boolean.TRUE.equals(devMode)) {
+ Entities.submit(entity, newDeleteJobTask(kubeJobName)
+ // namespace might have been deleted in parallel so okay if we don't delete the job
+ .allowingNonZeroExitCode()
+ .newTask()).get();
+ }
}
+ DynamicTasks.waitForLast();
}
} catch (Exception e) {
throw Exceptions.propagate(e);
@@ -346,6 +241,277 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
return taskBuilder.build();
}
+ private Boolean waitForContainerCompletedUsingK8sWaitFor(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
+ return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+ while (true) {
+ LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
+
+ long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
+ Boolean x = checkForContainerCompletedUsingK8sWaitFor(kubeJobName, entity, secondsLeft);
+
+ if (x != null) return x;
+ LOG.debug("Container job " + namespace + " not yet complete, will retry");
+
+ // other one-off checks for job error, we could do here
+ // e.g. if image can't be pulled, for instance
+
+ refreshStdout(stdout, kubeJobName, timer);
+
+ // probably timed out or job not yet available; short wait then retry
+ Time.sleep(Duration.millis(50));
+ }
+
+ }).build()).getUnchecked();
+ }
+
+ private Boolean waitForContainerCompletedUsingPodState(ByteArrayOutputStream stdout, String kubeJobName, Entity entity, CountdownTimer timer) {
+ return DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+ long retryDelay = 10;
+ while (true) {
+ LOG.debug("Container job " + kubeJobName + " submitted, now waiting on success or failure");
+
+ PodPhases phase = checkPodPhase(kubeJobName);
+ if (phase.equals(PodPhases.Succeeded)) return true;
+ if (phase.equals(PodPhases.Failed)) return false;
+
+ LOG.debug("Container job " + namespace + " not yet complete, will sleep then retry");
+
+ // other one-off checks for job error, we could do here
+ // e.g. if image can't be pulled, for instance
+
+ refreshStdout(stdout, kubeJobName, timer);
+
+ // probably timed out or job not yet available; short wait then retry
+ Time.sleep(Duration.millis(retryDelay));
+ retryDelay *= 1.5;
+ if (retryDelay > 250) {
+ // max out at 500ms
+ retryDelay = 500;
+ }
+ }
+
+ }).build()).getUnchecked();
+ }
+
+ private void refreshStdout(ByteArrayOutputStream stdout, String kubeJobName, CountdownTimer timer) throws IOException {
+ // finally get the partial log for reporting
+ ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, kubeJobName, namespace)).summary("Retrieve output so far").allowingNonZeroExitCode().newTask());
+ BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
+ outputSoFarCmd.block();
+ if (outputSoFarCmd.getExitCode() != 0) {
+ throw new IllegalStateException("Error detected with container job while reading logs (exit code " + outputSoFarCmd.getExitCode() + "): " + outputSoFarCmd.getStdout() + " / " + outputSoFarCmd.getStderr());
+ }
+ updateStdoutWithNewData(stdout, outputSoFarCmd.get());
+
+ if (timer.isExpired())
+ throw new IllegalStateException("Timeout waiting for success or failure");
+ }
+
+ private void updateStdoutWithNewData(ByteArrayOutputStream receiverStream, String outputFound) throws IOException {
+ int bytesAlreadyRead = receiverStream.size();
+ if (bytesAlreadyRead <= outputFound.length()) {
+ String newOutput = outputFound.substring(receiverStream.size());
+ LOG.debug("Container job " + namespace + " output: " + newOutput);
+ receiverStream.write(newOutput.getBytes(StandardCharsets.UTF_8));
+ } else {
+ // not sure why this happens, but it does sometimes; for now just reset
+ LOG.debug("Container job " + namespace + " output reset, length " + outputFound.length() + " less than " + bytesAlreadyRead + "; ignoring new output:\n" + outputFound + "\n" + new String(receiverStream.toByteArray()));
+ receiverStream.reset();
+ receiverStream.write(outputFound.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private Boolean checkForContainerCompletedUsingK8sWaitFor(String kubeJobName, Entity entity, long timeoutSeconds) {
+ final AtomicInteger finishCount = new AtomicInteger(0);
+
+ ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace))
+ .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+ try {
+ if (waitForSuccess.get().contains("condition met"))
+ LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace))
+ .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+ try {
+ if (waitForFailed.get().contains("condition met"))
+ LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ while (finishCount.get() == 0) {
+ LOG.debug("Container job " + kubeJobName + " waiting on complete or failed");
+ try {
+ synchronized (finishCount) {
+ finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+ if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+ return null;
+ }
+
+ private Boolean checkForContainerCompletedUsingPodState(String kubeJobName, Entity entity, long timeoutSeconds) {
+ final AtomicInteger finishCount = new AtomicInteger(0);
+
+ ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_COMPLETE_CMD, timeoutSeconds, kubeJobName, namespace))
+ .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+ try {
+ if (waitForSuccess.get().contains("condition met"))
+ LOG.debug("Container job " + kubeJobName + " detected as completed (succeeded) in kubernetes");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_WAIT_FAILED_CMD, timeoutSeconds, kubeJobName, namespace))
+ .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+ try {
+ if (waitForFailed.get().contains("condition met"))
+ LOG.debug("Container job " + kubeJobName + " detected as failed in kubernetes (may be valid non-zero exit)");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ while (finishCount.get() == 0) {
+ LOG.debug("Container job " + kubeJobName + " waiting on complete or failed");
+ try {
+ synchronized (finishCount) {
+ finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+ if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+ return null;
+ }
+
+ private PodPhases waitForContainerAvailable(String kubeJobName, ContainerTaskResult result, CountdownTimer timer) {
+ return DynamicTasks.queue(Tasks.<PodPhases>builder().dynamic(true).displayName("Wait for container to be running (or fail)").body(() -> {
+ long first = System.currentTimeMillis();
+ long last = first;
+ long backoffMillis = 10;
+ PodPhases phase = PodPhases.Unknown;
+ long startupReportDelay = 1000; // report any start longer than 1s
+ while (timer.isNotExpired()) {
+ phase = checkPodPhase(kubeJobName);
+ if (phase == PodPhases.Failed || phase == PodPhases.Succeeded || phase == PodPhases.Running) {
+ if (startupReportDelay>5000) LOG.info("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
+ else LOG.debug("Container detected in state "+phase+" after "+Duration.millis(System.currentTimeMillis()-first));
+ return phase;
+ }
+
+ if (phase!=PodPhases.Unknown && Strings.isBlank(result.kubePodName)) {
+ result.kubePodName = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_NAME_CMD, namespace, kubeJobName)).summary("Get pod name").allowingNonZeroExitCode().newTask()).get().trim();
+ }
+ if (phase == PodPhases.Pending && Strings.isNonBlank(result.kubePodName)) {
+ // if pending, need to look for errors
+ String failedEvents = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_FAILED_JSON_CMD, namespace, result.kubePodName)).summary("Check pod failed events").allowingNonZeroExitCode().newTask()).get().trim();
+ if (!"[]".equals(failedEvents)) {
+ String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events on failure").allowingNonZeroExitCode().newTask()).get().trim();
+ throw new IllegalStateException("Job pod failed: "+failedEvents+"\n"+events);
+ }
+ }
+
+ if (System.currentTimeMillis() - last > startupReportDelay) {
+ last = System.currentTimeMillis();
+
+ // log debug after 1s, then info after 5s, 20s, etc
+ // seems bad that it often takes 1s+ just to start the container :/
+ Consumer<String> log = startupReportDelay<3*1000 ? LOG::debug : LOG::info;
+
+ log.accept("Container taking a while to start ("+Duration.millis(last-first)+"): "+namespace+" "+ kubeJobName +" "+ result.kubePodName+" / phase '"+phase+"'");
+ String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+ if (Strings.isNonBlank(stateJsonS)) {
+ log.accept("Pod state: "+stateJsonS);
+ }
+ if (Strings.isNonBlank(result.kubePodName)) {
+ String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, result.kubePodName)).summary("Get pod events").allowingNonZeroExitCode().newTask()).get().trim();
+ log.accept("Pod events: \n"+events);
+ } else {
+ String events = DynamicTasks.queue(newSimpleTaskFactory(String.format(SCOPED_EVENTS_CMD, namespace, kubeJobName)).summary("Get job events").allowingNonZeroExitCode().newTask()).get().trim();
+ log.accept("Job events: \n"+events);
+ }
+
+ // first 1s, then 5s, then every 20s
+ startupReportDelay *= 5;
+ if (startupReportDelay > 20*1000) startupReportDelay = 20*1000;
+ }
+ long backoffMillis2 = backoffMillis;
+ Tasks.withBlockingDetails("waiting "+backoffMillis2+"ms for pod to be available (current status '" + phase + "')", () -> {
+ Time.sleep(backoffMillis2);
+ return null;
+ });
+ if (backoffMillis<80) backoffMillis*=2;
+ }
+ throw new IllegalStateException("Timeout waiting for pod to be available; current status is '" + phase + "'");
+ }).build()).getUnchecked();
+ }
+
+ private PodPhases checkPodPhase(String kubeJobName) {
+ PodPhases succeeded = getPodPhaseFromContainerState(kubeJobName);
+ if (succeeded != null) return succeeded;
+
+ // this is the more official way, fall back to it if above is not recognised (eg waiting)
+ String phase = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_PHASE_CMD, namespace, kubeJobName)).summary("Get pod phase").allowingNonZeroExitCode().newTask()).get().trim();
+ for (PodPhases candidate: PodPhases.values()) {
+ if (candidate.name().equalsIgnoreCase(phase)) return candidate;
+ }
+ return PodPhases.Unknown;
+ }
+
+ private PodPhases getPodPhaseFromContainerState(String kubeJobName) {
+ // pod container state is populated much sooner than the pod status and job fields and wait, so prefer it
+ String stateJsonS = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_STATUS_STATE_CMD, namespace, kubeJobName)).summary("Get pod state").allowingNonZeroExitCode().newTask()).get().trim();
+ if (Strings.isNonBlank(stateJsonS)) {
+ Object stateO = new Gson().fromJson(stateJsonS, Object.class);
+ if (stateO instanceof Map) {
+ if (!((Map<?, ?>) stateO).keySet().isEmpty()) {
+ Object stateK = (((Map<?, ?>) stateO).keySet().iterator().next());
+ if (stateK instanceof String) {
+ String stateS = (String) stateK;
+ if ("terminated".equalsIgnoreCase(stateS)) return PodPhases.Succeeded;
+ if ("running".equalsIgnoreCase(stateS)) return PodPhases.Running;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public ProcessTaskFactory<String> newDeleteJobTask(String kubeJobName) {
+ return newSimpleTaskFactory(String.format(JOBS_DELETE_CMD, kubeJobName, namespace)).summary("Delete job");
+ }
+
private String initNamespaceAndGetNewJobName() {
Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
if (entity == null) {
@@ -374,6 +540,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
.toLowerCase();
if (namespace==null) {
namespace = kubeJobName;
+ namespaceRandom = true;
}
return kubeJobName;
}
@@ -382,21 +549,28 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
return namespace;
}
- public boolean doDeleteNamespace() {
- if (namespace==null) return false;
+ public ProcessTaskWrapper<String> doDeleteNamespace(boolean wait, boolean requireSuccess) {
+ if (namespace==null) return null;
Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
- if (entity==null) return false;
+ if (entity==null) return null;
// clean up - delete namespace
Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
if (Boolean.TRUE.equals(devMode)) {
- return false;
+ return null;
}
- LOG.debug("Deleting namespace " + namespace);
+ LOG.info("Deleting namespace " + namespace);
// do this not as a subtask so we can run even if the main queue fails
- Entities.submit(entity, newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").newTask()).block();
- System.runFinalization();
- return true;
+ ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode();
+ if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
+ else tf = tf.requiringExitCodeZero();
+ ProcessTaskWrapper<String> task = Entities.submit(entity, tf.newTask());
+ if (wait) {
+ task.get();
+ LOG.info("Deleted namespace " + namespace);
+ System.runFinalization();
+ }
+ return task;
}
public T summary(String summary) { this.summary = summary; return self(); }
@@ -458,7 +632,9 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp
return self();
}
- public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete; return self(); }
+ public T setDeleteNamespaceAfter(Boolean delete) { this.deleteNamespace = delete; return self(); }
+ @Deprecated /** @deprecated since 1.1 when introduced */
+ public T deleteNamespace(Boolean delete) { return setDeleteNamespaceAfter(delete); }
/** visible in the container environment */
public T jobIdentifier(String jobIdentifier) {
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
index a688ae21b3..0b85dcfb1e 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
@@ -30,6 +30,8 @@ import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import java.util.HashMap;
@@ -43,10 +45,14 @@ import static org.testng.AssertJUnit.assertTrue;
@Test(groups = {"Live"})
public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerTaskTest.class);
+
@Test
public void testSuccessfulContainerTask() {
+ LOG.info("Starting container test");
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ LOG.info("Starting dedicated container run");
Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance()
.summary("Running container task")
.jobIdentifier("test-container-task")
@@ -58,6 +64,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+ LOG.info("Result: "+result + " / "+result.getMainStdout().trim());
Asserts.assertEquals(result.getMainStdout().trim(), "hello test");
}
@@ -220,7 +227,7 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
Asserts.assertEquals(result.getMainStdout().trim(), "hello " + uid);
} finally {
- DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
+ DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").setDeleteNamespaceAfter(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
}
}