You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2022/07/19 17:01:58 UTC
[brooklyn-server] branch master updated: rejig container task
This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 9ea3d2d981 rejig container task
9ea3d2d981 is described below
commit 9ea3d2d981dcc03e173fd4f61fbc20d5a65e5b1b
Author: Alex Heneveld <al...@cloudsoft.io>
AuthorDate: Tue Jul 19 17:59:51 2022 +0100
rejig container task
* to track output, gather exit code, and clearer semantics
* also fix dirty detection across all processes
---
.../core/effector/ssh/SshEffectorTasks.java | 1 +
.../system/internal/SystemProcessTaskFactory.java | 1 +
.../brooklyn/tasks/kubectl/ContainerCommons.java | 25 +-
.../brooklyn/tasks/kubectl/ContainerEffector.java | 10 +-
.../brooklyn/tasks/kubectl/ContainerSensor.java | 14 +-
.../tasks/kubectl/ContainerTaskFactory.java | 373 ++++++++++++++++-----
.../{JobBuilder.java => KubeJobFileCreator.java} | 49 +--
.../tasks/kubectl/ContainerEffectorTest.java | 5 +-
.../tasks/kubectl/ContainerSensorTest.java | 6 +-
.../brooklyn/tasks/kubectl/ContainerTaskTest.java | 224 ++++++++-----
...uilderTest.java => KubeJobSpecCreatorTest.java} | 58 ++--
11 files changed, 527 insertions(+), 239 deletions(-)
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
index c29563681d..d101b5c4a0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/ssh/SshEffectorTasks.java
@@ -122,6 +122,7 @@ public class SshEffectorTasks {
}
@Override
public synchronized ProcessTaskWrapper<RET> newTask() {
+ dirty = false;
Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
if (machine==null) {
if (log.isDebugEnabled())
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
index afe5d3346b..56be6cfc6f 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
@@ -68,6 +68,7 @@ public class SystemProcessTaskFactory<T extends SystemProcessTaskFactory<T,RET>,
@Override
public ProcessTaskWrapper<RET> newTask() {
+ dirty = false;
return new SystemProcessTaskWrapper();
}
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index 4601a9ce02..5dbdf41af5 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -21,7 +21,6 @@ package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.SetConfigKey;
import org.apache.brooklyn.util.time.Duration;
@@ -36,14 +35,16 @@ public interface ContainerCommons {
ConfigKey<PullPolicy> CONTAINER_IMAGE_PULL_POLICY = ConfigKeys.newConfigKey(new TypeToken<PullPolicy>() {} ,
"imagePullPolicy", "Container image pull policy. Allowed values: {IfNotPresent, Always, Never}. ", PullPolicy.ALWAYS);
- ConfigKey<String> CONTAINER_NAME = ConfigKeys.newStringConfigKey("containerName", "Container name");
ConfigKey<Boolean> KEEP_CONTAINER_FOR_DEBUGGING = ConfigKeys.newBooleanConfigKey("keepContainerForDebugging", "When set to true, the namespace" +
" and associated resources and services are not destroyed after execution. Defaults value is 'false'.", Boolean.FALSE);
- ConfigKey<List> COMMANDS = ConfigKeys.newConfigKey(List.class,"commands", "Commands to execute for container", Lists.newArrayList());
- ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Arguments to execute for container", Lists.newArrayList());
+ ConfigKey<Object> BASH_SCRIPT = ConfigKeys.newConfigKey(Object.class,"bashScript", "A bash script (as string or list of strings) to run, implies command '/bin/bash' '-c' and replaces arguments");
+ ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command", "Single command and optional arguments to execute for the container (overrides image EntryPoint and Cmd)", Lists.newArrayList());
+ ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Additional arguments to pass to the command at the container (in addition to the command supplied here or the default in the image)", Lists.newArrayList());
- ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container wait timeout", Duration.minutes(1));
+ ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container execution timeout (default 5 minutes)", Duration.minutes(5));
+
+ ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO = ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task should fail if container returns non-zero exit code (default true)", true);
ConfigKey<String> WORKING_DIR = ConfigKeys.newStringConfigKey("workingDir", "Location where the container commands are executed");
ConfigKey<Set<Map<String,String>>> VOLUME_MOUNTS = new SetConfigKey.Builder<>(new TypeToken<Map<String,String>>() {}, "volumeMounts")
@@ -52,11 +53,13 @@ public interface ContainerCommons {
ConfigKey<Set<Map<String,Object>>> VOLUMES = new SetConfigKey.Builder(new TypeToken<Map<String,Object>>() {}, "volumes")
.description("List of directories with data that is accessible across multiple containers").defaultValue(null).build();
- String NAMESPACE_CREATE_CMD = "kubectl create namespace brooklyn-%s"; // namespace name
- String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=brooklyn-%s"; // namespace name
- String JOBS_CREATE_CMD = "kubectl apply -f %s"; // deployment.yaml absolute path
- String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s"; // timeout, containerName
- String JOBS_LOGS_CMD = "kubectl logs jobs/%s"; // containerName
- String NAMESPACE_DELETE_CMD = "kubectl delete namespace brooklyn-%s"; // namespace name
+ String NAMESPACE_CREATE_CMD = "kubectl create namespace %s";
+ String NAMESPACE_SET_CMD = "kubectl config set-context --current --namespace=%s";
+ String JOBS_CREATE_CMD = "kubectl apply -f %s --namespace=%s";
+ String JOBS_FEED_CMD = "kubectl wait --timeout=%ds --for=condition=complete job/%s --namespace=%s";
+ String JOBS_FEED_FAILED_CMD = "kubectl wait --timeout=%ds --for=condition=failed job/%s --namespace=%s";
+ String JOBS_LOGS_CMD = "kubectl logs jobs/%s --namespace=%s";
+ String PODS_EXIT_CODE_CMD = "kubectl get pods --namespace=%s -ojsonpath='{.items[0].status.containerStatuses[0].state.terminated.exitCode}'";
+ String NAMESPACE_DELETE_CMD = "kubectl delete namespace %s";
}
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
index 5b34ae604e..0c41de1a24 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerEffector.java
@@ -26,12 +26,9 @@ import org.apache.brooklyn.core.effector.Effectors;
import org.apache.brooklyn.core.entity.EntityInitializers;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.EFFECTOR_TAG;
@@ -69,14 +66,13 @@ public class ContainerEffector extends AddEffectorInitializerAbstract implements
@Override
public String call(ConfigBag parameters) {
ConfigBag configBag = ConfigBag.newInstanceCopying(this.params).putAll(parameters);
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Executing Container Image: " + EntityInitializers.resolve(configBag, CONTAINER_IMAGE))
- .tag(entity().getId() + "-" + EFFECTOR_TAG)
+ .jobIdentifier(entity().getId() + "-" + EFFECTOR_TAG)
.configure(configBag.getAllConfig())
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity());
- Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- return result.toString();
+ return containerTask.getUnchecked().getMainStdout();
}
}
}
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
index 0d54e40680..7bb8dccc4e 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
@@ -18,7 +18,6 @@
*/
package org.apache.brooklyn.tasks.kubectl;
-import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -35,7 +34,6 @@ import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -77,18 +75,14 @@ public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements Cont
.callable(new Callable<Object>() {
@Override
public Object call() throws Exception {
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME))
- .tag(entity.getId() + "-" + SENSOR_TAG)
+ .jobIdentifier(entity.getId() + "-" + SENSOR_TAG)
.configure(configBag.getAllConfig())
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
-
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(res2);
+ String mainStdout = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout();
+ return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout);
}
})
.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index 7e9a5910df..a03f6f2b1a 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -19,103 +19,313 @@
package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
import org.apache.brooklyn.util.core.task.system.ProcessTaskStub;
import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory;
+import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.CountdownTimer;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.brooklyn.tasks.kubectl.ContainerCommons.*;
-public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> implements TaskFactory<Task<RET>> {
+public class ContainerTaskFactory<T extends ContainerTaskFactory<T>> implements TaskFactory<Task<ContainerTaskFactory.ContainerTaskResult>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerTaskFactory.class);
protected String summary;
- protected String tag = "";
+ protected String jobIdentifier = "";
protected final ConfigBag config = ConfigBag.newInstance();
+ private String namespace;
+ private Boolean createNamespace;
+ private Boolean deleteNamespace;
@Override
- public Task<RET> newTask() {
- ConfigBag configBag = this.config;
-
- List<String> commandsCfg = EntityInitializers.resolve(configBag, COMMANDS);
- List<String> argumentsCfg = EntityInitializers.resolve(configBag, ARGUMENTS);
- String containerImage = EntityInitializers.resolve(configBag, CONTAINER_IMAGE);
- PullPolicy containerImagePullPolicy = EntityInitializers.resolve(configBag, CONTAINER_IMAGE_PULL_POLICY);
- String containerNameFromCfg = EntityInitializers.resolve(configBag, CONTAINER_NAME);
- Boolean devMode = EntityInitializers.resolve(configBag, KEEP_CONTAINER_FOR_DEBUGGING);
-
- String workingDir = EntityInitializers.resolve(configBag, WORKING_DIR);
- Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(configBag, VOLUME_MOUNTS);
- Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(configBag, VOLUMES);
-
- if(Strings.isBlank(containerImage)) {
- throw new IllegalStateException("You must specify containerImage when using " + this.getClass().getSimpleName());
- }
+ public Task<ContainerTaskResult> newTask() {
+ final ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ TaskBuilder<ContainerTaskResult> taskBuilder = Tasks.<ContainerTaskResult>builder().dynamic(true)
+ .displayName(this.summary)
+ .tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDOUT, stdout))
+ .body(()-> {
+ List<String> commandsCfg = EntityInitializers.resolve(config, COMMAND);
+ List<String> argumentsCfg = EntityInitializers.resolve(config, ARGUMENTS);
- final String cleanImageName = containerImage.contains(":") ? containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
-
- final String containerName = (Strings.isBlank(containerNameFromCfg)
- ? ( (Strings.isNonBlank(this.tag) ? this.tag + "-" : "").concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10)))
- : containerNameFromCfg)
- .replaceAll("[^A-Za-z0-9-]", "") // remove all symbols
- .toLowerCase();
-
- final String jobYamlLocation = new JobBuilder()
- .withImage(containerImage)
- .withImagePullPolicy(containerImagePullPolicy)
- .withName(containerName)
- .withArgs(argumentsCfg)
- .withEnv(EntityInitializers.resolve(configBag, BrooklynConfigKeys.SHELL_ENVIRONMENT))
- .withCommands(Lists.newArrayList(commandsCfg))
- .withVolumeMounts(volumeMounts)
- .withVolumes(volumes)
- .withWorkingDir(workingDir)
- .build();
-
-
- final long timeoutInSeconds = EntityInitializers.resolve(configBag, TIMEOUT).toSeconds();
- Task<String> runCommandsTask = buildKubeTask(configBag, "Submit job", String.format(JOBS_CREATE_CMD,jobYamlLocation)).asTask();
- Task<String> waitTask = buildKubeTask(configBag, "Wait For Completion", String.format(JOBS_FEED_CMD,timeoutInSeconds,containerName)).asTask();
- if(!devMode) {
- // making these two inessential to insure proper namespace cleanup
- BrooklynTaskTags.addTagDynamically(runCommandsTask, BrooklynTaskTags.INESSENTIAL_TASK);
- BrooklynTaskTags.addTagDynamically(waitTask, BrooklynTaskTags.INESSENTIAL_TASK);
- }
+ Object bashScript = EntityInitializers.resolve(config, BASH_SCRIPT);
+ if (bashScript!=null) {
+ if (!commandsCfg.isEmpty()) LOG.warn("Ignoring 'command' "+commandsCfg+" because bashScript is set");
+ if (!argumentsCfg.isEmpty()) LOG.warn("Ignoring 'args' "+argumentsCfg+" because bashScript is set");
+
+ commandsCfg = MutableList.of("/bin/bash", "-c");
+ List<Object> argumentsCfgO = bashScript instanceof Iterable ? MutableList.copyOf((Iterable) commandsCfg) : MutableList.of(bashScript);
+ argumentsCfg = MutableList.of(argumentsCfgO.stream().map(x -> ""+x).collect(Collectors.joining("\n")));
+ }
+
+ String containerImage = EntityInitializers.resolve(config, CONTAINER_IMAGE);
+ PullPolicy containerImagePullPolicy = EntityInitializers.resolve(config, CONTAINER_IMAGE_PULL_POLICY);
+ Boolean devMode = EntityInitializers.resolve(config, KEEP_CONTAINER_FOR_DEBUGGING);
+
+ String workingDir = EntityInitializers.resolve(config, WORKING_DIR);
+ Set<Map<String,String>> volumeMounts = (Set<Map<String,String>>) EntityInitializers.resolve(config, VOLUME_MOUNTS);
+ Set<Map<String, Object>> volumes = (Set<Map<String, Object>>) EntityInitializers.resolve(config, VOLUMES);
+
+ if(Strings.isBlank(containerImage)) {
+ throw new IllegalStateException("You must specify containerImage when using " + this.getClass().getSimpleName());
+ }
+
+ Entity entity = BrooklynTaskTags.getContextEntity(Tasks.current());
+ if (entity == null) {
+ throw new IllegalStateException("Task must run in context of entity to background jobs");
+ }
+
+ final String cleanImageName = containerImage.contains(":") ? containerImage.substring(0, containerImage.indexOf(":")) : containerImage;
+
+ final String containerName = (Strings.isNonBlank(this.jobIdentifier) ? this.jobIdentifier + "-" : "")
+ .concat(cleanImageName).concat("-").concat(Strings.makeRandomId(10))
+ .replaceAll("[^A-Za-z0-9-]", "") // remove all symbols
+ .toLowerCase();
+ if (namespace==null) {
+ namespace = "brooklyn-" + containerName;
+ }
+
+ LOG.debug("Submitting container job in namespace "+namespace);
+
+ Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, BrooklynConfigKeys.SHELL_ENVIRONMENT));
+ final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml = new KubeJobFileCreator()
+ .withImage(containerImage)
+ .withImagePullPolicy(containerImagePullPolicy)
+ .withName(containerName)
+ .withArgs(argumentsCfg)
+ .withEnv(env)
+ .withCommand(Lists.newArrayList(commandsCfg))
+ .withVolumeMounts(volumeMounts)
+ .withVolumes(volumes)
+ .withWorkingDir(workingDir)
+ .createFile();
+ Tasks.addTagDynamically(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, env));
+
+ try {
+
+ Duration timeout = EntityInitializers.resolve(config, TIMEOUT);
+
+ ContainerTaskResult result = new ContainerTaskResult();
+ result.interestingJobs = MutableList.of();
+
+ ProcessTaskWrapper<ProcessTaskWrapper<?>> createNsJob = null;
+ if (!Boolean.FALSE.equals(createNamespace)) {
+ ProcessTaskFactory<ProcessTaskWrapper<?>> createNsJobF = newSimpleTaskFactory(
+ String.format(NAMESPACE_CREATE_CMD, namespace)
+ //, String.format(NAMESPACE_SET_CMD, namespace)
+ ).summary("Set up namespace").returning(x -> x);
+ if (createNamespace==null) {
+ createNsJobF.allowingNonZeroExitCode();
+ }
+ createNsJob = DynamicTasks.queue(createNsJobF.newTask());
+ }
+
+ // only delete if told to always, unless we successfully create it
+ boolean deleteNamespaceHere = Boolean.TRUE.equals(deleteNamespace);
+ try {
+ if (createNsJob!=null) {
+ ProcessTaskWrapper<?> nsDetails = createNsJob.get();
+ if (nsDetails.getExitCode()==0) {
+ LOG.debug("Namespace created");
+ if (deleteNamespace==null) deleteNamespaceHere = true;
+ } else if (nsDetails.getExitCode()==1 && nsDetails.getStderr().contains("AlreadyExists")) {
+ if (Boolean.TRUE.equals(createNamespace)) {
+ LOG.warn("Namespace "+namespace+" already exists; failing");
+ throw new IllegalStateException("Namespace "+namespace+" exists when creating a job that expects to create this namespace");
+ } else {
+ LOG.debug("Namespace exists already; reusing it");
+ }
+ } else {
+ LOG.warn("Unexpected namespace creation problem: "+nsDetails.getStderr()+ "(code "+nsDetails.getExitCode()+")");
+ if (deleteNamespace==null) deleteNamespaceHere = true;
+ throw new IllegalStateException("Unexpected namespace creation problem ("+namespace+"); see log for more details");
+ }
+ }
+
+ result.interestingJobs.add(DynamicTasks.queue(
+ newSimpleTaskFactory(String.format(JOBS_CREATE_CMD, jobYaml.getFile().getAbsolutePath(), namespace)).summary("Submit job").newTask()));
+
+ final CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
+
+ // use `wait --for` api, but in a 5s loop in case there are other issues
+ boolean succeeded = DynamicTasks.queue(Tasks.<Boolean>builder().dynamic(true).displayName("Wait for success or failure").body(() -> {
+ while (true) {
+ LOG.debug("Container job submitted, now waiting on success or failure");
+
+ long secondsLeft = Math.min(Math.max(1, timer.getDurationRemaining().toSeconds()), 5);
+ final AtomicInteger finishCount = new AtomicInteger(0);
+
+ ProcessTaskWrapper<String> waitForSuccess = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_CMD, secondsLeft, containerName, namespace))
+ .summary("Wait for success ('complete')").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for success then notify", () -> {
+ try {
+ if (waitForSuccess.get().contains("condition met")) LOG.debug("Container job "+namespace+" detected as completed (succeeded) in kubernetes");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ ProcessTaskWrapper<String> waitForFailed = Entities.submit(entity, newSimpleTaskFactory(String.format(JOBS_FEED_FAILED_CMD, secondsLeft, containerName, namespace))
+ .summary("Wait for failed").allowingNonZeroExitCode().newTask());
+ Entities.submit(entity, Tasks.create("Wait for failed then notify", () -> {
+ try {
+ if (waitForFailed.get().contains("condition met")) LOG.debug("Container job "+namespace+" detected as failed in kubernetes (may be valid non-zero exit)");
+ } finally {
+ synchronized (finishCount) {
+ finishCount.incrementAndGet();
+ finishCount.notifyAll();
+ }
+ }
+ }));
+
+ while (finishCount.get() == 0) {
+ LOG.debug("Container job "+namespace+" waiting on complete or failed");
+ try {
+ synchronized (finishCount) {
+ finishCount.wait(Duration.TEN_SECONDS.toMilliseconds());
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ if (waitForSuccess.isDone() && waitForSuccess.getExitCode() == 0) return true;
+ if (waitForFailed.isDone() && waitForFailed.getExitCode() == 0) return false;
+ LOG.debug("Container job "+namespace+" not yet complete, will retry");
+ // probably timed out or job not yet available; short wait then retry
+ Time.sleep(Duration.millis(50));
+ if (timer.isExpired())
+ throw new IllegalStateException("Timeout waiting for success or failure");
+
+ // any other one-off checks for job error, we could do here
+
+ // finally get the partial log for reporting
+ ProcessTaskWrapper<String> outputSoFarCmd = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, containerName, namespace)).summary("Retrieve output so far").newTask());
+ BrooklynTaskTags.setTransient(outputSoFarCmd.asTask());
+ String outputSoFar = outputSoFarCmd.get();
+ String newOutput = outputSoFar.substring(stdout.size());
+ LOG.debug("Container job "+namespace+" output: "+newOutput);
+ stdout.write(newOutput.getBytes(StandardCharsets.UTF_8));
+ }
+
+ }).build()).getUnchecked();
+ LOG.debug("Container job "+namespace+" completed, success "+succeeded);
+
+ ProcessTaskWrapper<String> retrieveOutput = DynamicTasks.queue(newSimpleTaskFactory(String.format(JOBS_LOGS_CMD, containerName, namespace)).summary("Retrieve output").newTask());
+ result.interestingJobs.add(retrieveOutput);
+
+ ProcessTaskWrapper<String> retrieveExitCode = DynamicTasks.queue(newSimpleTaskFactory(String.format(PODS_EXIT_CODE_CMD, namespace)).summary("Retrieve exit code").newTask());
+ result.interestingJobs.add(retrieveExitCode);
+
+ DynamicTasks.waitForLast();
+ result.mainStdout = retrieveOutput.get();
+ stdout.write(result.mainStdout.substring(stdout.size()).getBytes(StandardCharsets.UTF_8));
+
+ String exitCodeS = retrieveExitCode.getStdout();
+ if (Strings.isNonBlank(exitCodeS)) result.mainExitCode = Integer.parseInt(exitCodeS.trim());
+ else result.mainExitCode = -1;
+
+ if (result.mainExitCode!=0 && config.get(REQUIRE_EXIT_CODE_ZERO)) {
+ LOG.info("Failed container job "+namespace+" (exit code "+result.mainExitCode+") output: "+result.mainExitCode);
+ throw new IllegalStateException("Non-zero exit code (" + result.mainExitCode + ") disallowed");
+ }
+
+ return result;
+
+ } finally {
+ // clean up - delete namespace
+ if (!devMode && deleteNamespaceHere) {
+ LOG.debug("Deleting namespace "+namespace);
+ // do this not as a subtask so we can run even if the main queue fails
+ Entities.submit(entity, newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").newTask()).block();
+ }
+ System.runFinalization();
+ }
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ jobYaml.deleteIfTemp();
+ }
+ });
- TaskBuilder<RET> taskBuilder = Tasks.<RET>builder()
- .displayName(this.summary)
- .add(buildKubeTask(configBag, "Set Up and Run",
- String.format(NAMESPACE_CREATE_CMD,containerName),
- String.format(NAMESPACE_SET_CMD,containerName)))
- .add(runCommandsTask)
- .add(waitTask)
- .add(buildKubeTask(configBag, "Retrieve Output", String.format(JOBS_LOGS_CMD,containerName)));
-
- if(!devMode) {
- taskBuilder.add(buildKubeTask(configBag, "Tear Down", String.format(NAMESPACE_DELETE_CMD,containerName)));
- }
return taskBuilder.build();
}
- public T summary(String summary) {
- this.summary = summary;
+ public T summary(String summary) { this.summary = summary; return self(); }
+ public T timeout(Duration timeout) { config.put(TIMEOUT, timeout); return self(); }
+ public T command(List<String> commands) { config.put(COMMAND, commands); return self(); }
+ public T command(String baseCommandWithNoArgs, String ...extraCommandArguments) { config.put(COMMAND, MutableList.of(baseCommandWithNoArgs).appendAll(Arrays.asList(extraCommandArguments))); return self(); }
+ public T bashScriptCommands(List<String> commands) {
+ config.put(BASH_SCRIPT, commands);
+ return self();
+ }
+ public T bashScriptCommands(String firstCommandAndArgs, String ...otherCommandAndArgs) { return bashScriptCommands(MutableList.of(firstCommandAndArgs).appendAll(Arrays.asList(otherCommandAndArgs))); }
+ public T image(String image) { config.put(CONTAINER_IMAGE, image); return self(); }
+ public T allowNonZeroExitCode() { return allowNonZeroExitCode(true); }
+ public T allowNonZeroExitCode(boolean allowNonZero) { config.put(REQUIRE_EXIT_CODE_ZERO, !allowNonZero); return self(); }
+ public T imagePullPolicy(PullPolicy policy) { config.put(CONTAINER_IMAGE_PULL_POLICY, policy); return self(); }
+ public T env(Map<String,?> map) {
+ config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf( map ) );
+ return self();
+ }
+ public T env(String key, Object val) {
+ return env(MutableMap.copyOf( config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val));
+ }
+
+ /** specify the namespace to use, and whether to create or delete it. by default a randomly generated namespace is used and always cleaned up,
+ * but by using this, a caller can ensure the namespace persists. if using this, it is the caller's responsibility to avoid collisions.
+ *
+ * @param namespace namespace to use
+ * @param create whether to create; null (default) is to auto-detect, create it if it doesn't exist
+ * @param delete wehther to delete; null (default) means to delete it if and only if we created it (and not dev mode)
+ */
+ public T useNamespace(String namespace, Boolean create, Boolean delete) {
+ this.namespace = namespace;
+ this.createNamespace = create;
+ this.deleteNamespace = delete;
return self();
}
- public T tag(String tag) {
- this.tag = tag;
+ public T deleteNamespace(Boolean delete) { this.deleteNamespace = delete; return self(); }
+
+ /** visible in the container environment */
+ public T jobIdentifier(String jobIdentifier) {
+ this.jobIdentifier = jobIdentifier;
return self();
}
@@ -127,23 +337,36 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> im
return self();
}
- public static ProcessTaskWrapper<String> buildKubeTask(final ConfigBag configBag, final String taskSummary, final String... kubeCommands) {
- Map<String, Object> env = EntityInitializers.resolve(configBag, BrooklynConfigKeys.SHELL_ENVIRONMENT);
- Map<String, String> envVars = MutableMap.of();
- if(env != null && env.size() > 0) {
- env.forEach((k,v) -> envVars.put(k, v.toString()));
- }
+ private ProcessTaskFactory<String> newSimpleTaskFactory(final String... kubeCommands) {
return new SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory<String>(kubeCommands)
- .summary(taskSummary)
- .configure(configBag.getAllConfig())
- .environmentVariables(envVars) // needed to be shown in the UI ;)
+ //i think we don't care about any of these configs, and most cause debug messages about them being ignored
+ //.configure(config.getAllConfig())
+ //.environmentVariables(envVars) // needed to be shown in the UI ;)
+
.<String>returning(ProcessTaskStub.ScriptReturnType.STDOUT_STRING)
- .requiringExitCodeZero().newTask();
+ .requiringExitCodeZero();
}
- public static class ConcreteContainerTaskFactory<RET> extends ContainerTaskFactory<ConcreteContainerTaskFactory<RET>, RET> {
+ public static class ConcreteContainerTaskFactory extends ContainerTaskFactory<ConcreteContainerTaskFactory> {
public ConcreteContainerTaskFactory() {
super();
}
}
+
+ public static class ContainerTaskResult {
+ private List<ProcessTaskWrapper<?>> interestingJobs;
+ private String mainStdout;
+ private Integer mainExitCode;
+
+ /** This will be 0 unless allowNonZeroExitCode was specified */
+ public Integer getMainExitCode() {
+ return mainExitCode;
+ }
+ public String getMainStdout() {
+ return mainStdout;
+ }
+ public List<ProcessTaskWrapper<?>> getInterestingJobs() {
+ return interestingJobs;
+ }
+ }
}
diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
similarity index 88%
rename from software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
rename to software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
index 9249393f44..5b090d7442 100644
--- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/JobBuilder.java
+++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/KubeJobFileCreator.java
@@ -20,6 +20,7 @@ package org.apache.brooklyn.tasks.kubectl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,8 +40,8 @@ import java.util.stream.Collectors;
/**
* This was needed to ensure our Kubernetes Yaml Job configurations are valid.
*/
-public class JobBuilder {
- private static final Logger LOG = LoggerFactory.getLogger(JobBuilder.class);
+public class KubeJobFileCreator {
+ private static final Logger LOG = LoggerFactory.getLogger(KubeJobFileCreator.class);
String jobName;
String imageName;
@@ -51,21 +52,21 @@ public class JobBuilder {
String prefix = "brooklyn-job";
- List<String> commands = Lists.newArrayList();
+ List<String> command = Lists.newArrayList();
List<String> args = Lists.newArrayList();
- Map<String, Object> env = Maps.newHashMap();
+ Map<String, String> env = Maps.newHashMap();
List<Map<String,String>> volumeMounts = Lists.newArrayList();
List<Map<String, Object>> volumes = Lists.newArrayList();
- public JobBuilder withName(final String name) {
+ public KubeJobFileCreator withName(final String name) {
this.jobName = name;
return this;
}
- public JobBuilder withImage(final String image){
+ public KubeJobFileCreator withImage(final String image){
this.imageName = image;
return this;
}
@@ -75,59 +76,59 @@ public class JobBuilder {
* @param eimagePullPolicy
* @return
*/
- public JobBuilder withImagePullPolicy(final PullPolicy eimagePullPolicy){
+ public KubeJobFileCreator withImagePullPolicy(final PullPolicy eimagePullPolicy){
if (eimagePullPolicy != null) {
this.imagePullPolicy = eimagePullPolicy.val();
}
return this;
}
- public JobBuilder withCommands(final List<String> commandsArg){
- if (commandsArg != null) {
- this.commands.addAll(commandsArg);
+ public KubeJobFileCreator withCommand(final List<String> commandAndEntryPointArgs){
+ if (commandAndEntryPointArgs != null) {
+ this.command.addAll(commandAndEntryPointArgs);
}
return this;
}
- public JobBuilder withArgs(final List<String> args){
+ public KubeJobFileCreator withArgs(final List<String> args){
if (args != null) {
this.args.addAll(args);
}
return this;
}
- public JobBuilder withVolumeMounts(final Set<Map<String,String>> volumeMounts) {
+ public KubeJobFileCreator withVolumeMounts(final Set<Map<String,String>> volumeMounts) {
if (volumeMounts != null) {
this.volumeMounts.addAll(volumeMounts);
}
return this;
}
- public JobBuilder withVolumes(final Set<Map<String, Object>> volumes) {
+ public KubeJobFileCreator withVolumes(final Set<Map<String, Object>> volumes) {
if (volumes != null) {
this.volumes.addAll(volumes);
}
return this;
}
- public JobBuilder withWorkingDir(String workingDir) {
+ public KubeJobFileCreator withWorkingDir(String workingDir) {
this.workingDir = workingDir;
return this;
}
- public JobBuilder withPrefix(final String prefixArg){
+ public KubeJobFileCreator withPrefix(final String prefixArg){
this.prefix = prefixArg;
return this;
}
- public JobBuilder withEnv(final Map<String,Object> env){
+ public KubeJobFileCreator withEnv(final Map<String,String> env){
if (env != null) {
this.env.putAll(env);
}
return this;
}
- public String build(){
+ public BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> createFile(){
JobTemplate jobTemplate = new JobTemplate(jobName);
ContainerSpec containerSpec = jobTemplate.getSpec().getTemplate().getContainerSpec(0);
@@ -142,13 +143,13 @@ public class JobBuilder {
List<Map<String,String>> envList = env.entrySet().stream().map (e -> {
Map<String,String> envItem = new HashMap<>();
envItem.put("name", e.getKey());
- envItem.put("value", e.getValue().toString());
+ envItem.put("value", e.getValue());
return envItem;
}).collect(Collectors.toList());
containerSpec.setEnv(envList);
}
- if (!commands.isEmpty()) {
- containerSpec.setCommand(this.commands);
+ if (!command.isEmpty()) {
+ containerSpec.setCommand(this.command);
}
if (!args.isEmpty()) {
containerSpec.setArgs(this.args);
@@ -177,7 +178,7 @@ public class JobBuilder {
return serializeAndWriteToTempFile(jobTemplate);
}
- private String serializeAndWriteToTempFile(JobTemplate jobTemplate) {
+ private BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> serializeAndWriteToTempFile(JobTemplate jobTemplate) {
DumperOptions options = new DumperOptions();
options.setIndent(2);
options.setPrettyFlow(true);
@@ -203,8 +204,8 @@ public class JobBuilder {
PrintWriter sw = new PrintWriter(jobBodyPath);
Yaml yaml = new Yaml(representer, options);
yaml.dump(jobTemplate, sw);
- LOG.info("Job body dumped at: {}" , jobBodyPath.getAbsolutePath());
- return jobBodyPath.getAbsolutePath();
+ LOG.debug("Job body dumped at: {}" , jobBodyPath.getAbsolutePath());
+ return new BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<>(jobBodyPath, true);
} catch (IOException e) {
throw new RuntimeException("Failed to create temp file for container", e);
}
@@ -226,7 +227,7 @@ class TemplateSpec {
/**
* To do so, set .spec.backoffLimit to specify the number of retries before considering a Job as failed. The back-off limit is set by default to 6.
*/
- Integer backoffLimit = 1;
+ Integer backoffLimit = 0;
JobSpec template;
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
index cee55084e3..d882b2fd06 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerEffectorTest.java
@@ -49,7 +49,8 @@ public class ContainerEffectorTest extends BrooklynAppUnitTestSupport {
ContainerEffector.EFFECTOR_NAME, "test-container-effector",
ContainerCommons.CONTAINER_IMAGE, "perl",
ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT,
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash", "-c", "echo " + message),
+// ContainerCommons.COMMAND, ImmutableList.of("/bin/bash", "-c", "echo " + message),
+ ContainerCommons.BASH_SCRIPT, "echo "+message+" ${HELLO}",
BrooklynConfigKeys.SHELL_ENVIRONMENT, ImmutableMap.<String, Object>of("HELLO", "WORLD")));
ContainerEffector initializer = new ContainerEffector(parameters);
@@ -58,7 +59,7 @@ public class ContainerEffectorTest extends BrooklynAppUnitTestSupport {
EntityAsserts.assertAttributeEqualsEventually(parentEntity, Attributes.SERVICE_UP, true);
Object output = Entities.invokeEffector(app, parentEntity, parentEntity.getEffector("test-container-effector")).getUnchecked(Duration.ONE_MINUTE);
- assertTrue(output.toString().contains(message));
+ assertTrue(output.toString().contains(message+" WORLD"));
}
@Test
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
index 64a2f5f1c7..5de54807d3 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
@@ -41,7 +41,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE, "perl",
ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT,
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash", "-c","echo " + message) ,
+ ContainerCommons.COMMAND, ImmutableList.of("/bin/bash", "-c","echo " + message) ,
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
@@ -59,7 +59,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE, "perl",
- ContainerCommons.COMMANDS, ImmutableList.of("/bin/bash") ,
+ ContainerCommons.COMMAND, ImmutableList.of("/bin/bash") ,
ContainerCommons.ARGUMENTS, ImmutableList.of("-c", "echo " + message) ,
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "test-echo-sensor"));
@@ -112,7 +112,7 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport {
public void testTfVersionSensor() {
ConfigBag parameters = ConfigBag.newInstance(ImmutableMap.of(
ContainerCommons.CONTAINER_IMAGE, "hashicorp/terraform:1.3.0-alpha20220622",
- ContainerCommons.COMMANDS, ImmutableList.of("terraform", "version" ),
+ ContainerCommons.COMMAND, ImmutableList.of("terraform", "version" ),
ContainerSensor.SENSOR_PERIOD, "1s",
ContainerSensor.SENSOR_NAME, "tf-version-sensor"));
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
index 36fe0acc08..4dda230118 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskTest.java
@@ -19,27 +19,21 @@
package org.apache.brooklyn.tasks.kubectl;
import com.beust.jcommander.internal.Maps;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.mgmt.HasTaskChildren;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.Test;
import java.util.HashMap;
-import java.util.List;
-
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static org.testng.AssertJUnit.assertTrue;
@@ -53,107 +47,181 @@ public class ContainerTaskTest extends BrooklynAppUnitTestSupport {
public void testSuccessfulContainerTask() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "perl");
- configBag.put("commands", Lists.newArrayList("/bin/bash", "-c","echo 'hello test'"));
- configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running container task")
- .configure(configBag)
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test'" )
.newTask();
+
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
+ ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello test");
+ }
+
+ @Test
+ public void testContainerTaskWithVar() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .env("test_name", "EnvTest")
+ .bashScriptCommands("echo hello ${test_name}" )
+ .newTask();
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.startsWith("hello test"));
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+ ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello EnvTest");
+ Asserts.assertEquals(BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_ENV).streamContents.get().trim(), "test_name=\"EnvTest\"");
}
@Test
public void testSuccessfulContainerTerraformTask() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "hashicorp/terraform:latest");
- configBag.put("commands", ImmutableList.of("terraform", "version" ));
- configBag.put("imagePullPolicy", PullPolicy.IF_NOT_PRESENT);
-
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
.summary("Running terraform-container task")
- .configure(configBag)
+ .jobIdentifier("test-container-task")
+ .timeout(Duration.TWO_MINUTES)
+ .image("hashicorp/terraform:latest")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .command( "terraform", "version" )
.newTask();
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.startsWith("Terraform"));
+ ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked();
+ assertTrue(result.getMainStdout().startsWith("Terraform"));
}
- @Test// tries to execute local command, wants it to fail, but even so best as integration
- public void testFailingContainerTask() {
+ @Test // execute local command, assert we get exit code, and it fails
+ public void testExpectedFailingContainerTask() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
- List<String> commands = MutableList.of("/bin/bash", "-c","echo 'hello test' & exit 1");
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+ .allowNonZeroExitCode()
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+ Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE)); // should complete in 1 minute, i.e. we detect the failed
+ Asserts.assertTrue(containerTask.isDone());
+ Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+ Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(), "hello test");
+ }
- Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "perl");
- configBag.put("commands", commands);
- configBag.put("timeout", "1m");
-
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
- .summary("Running docker task")
- .configure(configBag)
+
+ @Test // execute local command, assert we get exit code, and it fails
+ public void testSleepingAndExpectedFailingContainerTask() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .bashScriptCommands("echo starting", "sleep 6", "echo done", "exit 42", "echo ignored")
+ .allowNonZeroExitCode()
.newTask();
- try {
- DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity).getTask().get();
- if (containerTask instanceof HasTaskChildren) {
- for (Task<?> child: ((HasTaskChildren)containerTask).getChildren()) {
- if(child.getTags().contains(BrooklynTaskTags.INESSENTIAL_TASK) && child.isError()) {
- child.get();
- }
- }
- }
- } catch (Exception e) {
- Asserts.expectedFailureContains(e, "Process task ended with exit code", "when 0 was required");
- }
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+ Asserts.eventually(() -> {
+ BrooklynTaskTags.WrappedStream stream = BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_STDOUT);
+ if (stream==null) return null;
+ return stream.streamContents.get().trim();
+ }, x -> x.equals("starting"));
+ Asserts.assertFalse(containerTask.isDone());
+
+ // may be occasional timing glitches here, eg if namespace cleanup takes > 3s
+ Stopwatch sw = Stopwatch.createStarted();
+ // should complete in under 10s, i.e. we detect the failed in less than 5s after
+ Asserts.assertTrue(containerTask.blockUntilEnded(Duration.seconds(15)), "should definitely finish within 15s of starting");
+ Asserts.assertThat(Duration.of(sw.elapsed()), dur -> dur.isShorterThan(Duration.seconds(10)));
+
+ Asserts.assertTrue(containerTask.isDone());
+ Asserts.assertEquals(BrooklynTaskTags.stream(containerTask, BrooklynTaskTags.STREAM_STDOUT).streamContents.get().trim(), "starting\ndone");
+ Asserts.assertEquals((int)containerTask.getUnchecked().getMainExitCode(), 42);
+ Asserts.assertEquals(containerTask.getUnchecked().getMainStdout().trim(), "starting\ndone");
+ }
+
+ @Test // execute local command, assert fails if exit code not allowed to be zero
+ public void testNotExpectedFailingContainerTask() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("perl")
+ .command( "/bin/bash", "-c","echo 'hello test' && exit 42" )
+// .allowNonZeroExitCode()
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+
+ Asserts.assertTrue(containerTask.blockUntilEnded(Duration.ONE_MINUTE)); // should complete in 1 minute, i.e. we detect the failed
+ Asserts.assertTrue(containerTask.isDone());
+ Asserts.assertTrue(containerTask.isError());
+ Asserts.assertFailsWith(() -> containerTask.getUnchecked(), error -> Asserts.expectedFailureContainsIgnoreCase(error, "Non-zero", "42"));
}
@Test
public void testScriptContainerTask() {
TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
Map<String,Object> volumes = Maps.newHashMap();
- volumes.put("name", "tf-ws");
- volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
-
- List<String> commands = MutableList.of("./hello.sh");
+ String volumeId = "brooklyn-container-task-test-volume";
+ String uid = Identifiers.makeRandomId(8).toLowerCase();
+ volumes.put("name", volumeId);
+ // sometimes this can be mounted from local drive, but does not always need to be
+ volumes.put("hostPath", Maps.newHashMap("path", "/tmp/brooklyn-container-task-test-shared"));
Map<String,Object> configBag = new HashMap<>();
- configBag.put("name", "test-container-task");
- configBag.put("image", "hhwang927/ubuntu_base");
- configBag.put("imagePullPolicy", "never");
- configBag.put("commands", commands);
- configBag.put("workingDir", "/tfws/scripts");
+ configBag.put("workingDir", "/brooklyn-mount-dir/scripts");
configBag.put("volumes", Sets.newHashSet(volumes));
- configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")));
+ configBag.put("volumeMounts", Sets.newHashSet(Maps.newHashMap("name", volumeId, "mountPath", "/brooklyn-mount-dir")));
- Task<String> containerTask = new ContainerTaskFactory.ConcreteContainerTaskFactory<String>()
- .summary("Running docker task")
- .configure(configBag)
- .newTask();
- DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
- Object result = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES));
- List<String> res = (List<String>) result;
- while(!res.isEmpty() && Iterables.getLast(res).matches("namespace .* deleted\\s*")) res = res.subList(0, res.size()-1);
+ ContainerTaskFactory.ConcreteContainerTaskFactory baseFactory = new ContainerTaskFactory.ConcreteContainerTaskFactory()
+ .summary("Running container task")
+ .jobIdentifier("test-container-task")
+ .imagePullPolicy(PullPolicy.IF_NOT_PRESENT)
+ .timeout(Duration.TWO_MINUTES)
+ .image("hhwang927/ubuntu_base")
+ .useNamespace("brooklyn-container-test-"+uid, null, false)
+ .configure(configBag);
- String res2 = res.isEmpty() ? null : Iterables.getLast(res);
- assertTrue(res2.contains("hello"));
+ try {
+ // first create the script
+ Task<ContainerTaskFactory.ContainerTaskResult> setup = baseFactory.bashScriptCommands(
+ "mkdir -p /brooklyn-mount-dir/scripts",
+ "cd /brooklyn-mount-dir/scripts",
+ "echo echo hello " + uid + " > hello-"+uid+".sh",
+ "chmod +x hello-"+uid+".sh"
+ ).newTask();
+ DynamicTasks.queueIfPossible(setup).orSubmitAsync(entity).getTask().getUnchecked();
+
+ // now make a new container that should see the same mount point, and try running the command
+ Task<ContainerTaskFactory.ContainerTaskResult> containerTask = baseFactory.bashScriptCommands(
+ "./hello-"+uid+".sh"
+ )
+ .newTask();
+ DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+ ContainerTaskFactory.ContainerTaskResult result = containerTask.getUnchecked(Duration.ONE_MINUTE);
+ Asserts.assertEquals(result.getMainStdout().trim(), "hello " + uid);
+
+ } finally {
+ DynamicTasks.queueIfPossible( baseFactory.summary("cleaning up").deleteNamespace(true).bashScriptCommands("rm hello-"+uid+".sh") ).orSubmitAsync(entity);
+ }
}
}
diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
similarity index 74%
rename from software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
rename to software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
index 1522271150..fca932cd95 100644
--- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/JobBuilderTest.java
+++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/KubeJobSpecCreatorTest.java
@@ -21,35 +21,36 @@ package org.apache.brooklyn.tasks.kubectl;
import com.beust.jcommander.internal.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.brooklyn.core.mgmt.ha.BrooklynBomOsgiArchiveInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
+import java.io.File;
import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.Map;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-public class JobBuilderTest {
- private static final Logger LOG = LoggerFactory.getLogger(JobBuilderTest.class);
+public class KubeJobSpecCreatorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KubeJobSpecCreatorTest.class);
@Test
public void testPerlWithArgs() throws Exception{
- String yamlJobLocation =
- new JobBuilder().withImage("perl").withName("perl-args-test")
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+ new KubeJobFileCreator().withImage("perl").withName("perl-args-test")
.withArgs(Lists.newArrayList( "echo", "aaa"))
.withImagePullPolicy(PullPolicy.ALWAYS) // explicit "Always"
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-args-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -68,20 +69,20 @@ public class JobBuilderTest {
@Test
public void testPerlWithArgsAndCommand() throws Exception{
- String yamlJobLocation =
- new JobBuilder().withImage("perl").withName("perl-args-and-command-test")
- .withCommands(Lists.newArrayList("/bin/bash"))
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+ new KubeJobFileCreator().withImage("perl").withName("perl-args-and-command-test")
+ .withCommand(Lists.newArrayList("/bin/bash"))
.withArgs(Lists.newArrayList("-c", "echo aaa"))
.withImagePullPolicy(PullPolicy.NEVER)
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-args-and-command-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -102,19 +103,19 @@ public class JobBuilderTest {
@Test
public void testPerlCommand() throws Exception{
- String yamlJobLocation =
- new JobBuilder().withImage("perl").withName("perl-command-test")
- .withCommands(Lists.newArrayList("/bin/bash", "-c", "echo aaa"))
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation =
+ new KubeJobFileCreator().withImage("perl").withName("perl-command-test")
+ .withCommand(Lists.newArrayList("/bin/bash", "-c", "echo aaa"))
.withImagePullPolicy(PullPolicy.IF_NOT_PRESENT)
- .build();
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: perl-command-test\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +
@@ -137,21 +138,20 @@ public class JobBuilderTest {
Map<String,Object> volumes = Maps.newHashMap();
volumes.put("name", "tf-ws");
volumes.put("hostPath", Maps.newHashMap("path", "/tfws"));
- String yamlJobLocation =
- new JobBuilder().withImage("hashicorp/terraform").withName("tf-version")
- .withVolumes(Sets.newHashSet(volumes))
- .withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")))
- .withCommands(Lists.newArrayList("terraform", "version"))
- .withWorkingDir("/tfws/app1")
- .build();
+ BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> yamlJobLocation = new KubeJobFileCreator().withImage("hashicorp/terraform").withName("tf-version")
+ .withVolumes(Sets.newHashSet(volumes))
+ .withVolumeMounts(Sets.newHashSet(Maps.newHashMap("name", "tf-ws", "mountPath", "/tfws")))
+ .withCommand(Lists.newArrayList("terraform", "version"))
+ .withWorkingDir("/tfws/app1")
+ .createFile();
assertNotNull(yamlJobLocation);
- String actual = String.join("\n", Files.readAllLines(Paths.get(yamlJobLocation)));
+ String actual = String.join("\n", Files.readAllLines(yamlJobLocation.getFile().toPath()));
String expected = "apiVersion: batch/v1\n" +
"kind: Job\n" +
"metadata:\n" +
" name: tf-version\n" +
"spec:\n" +
- " backoffLimit: 1\n" +
+ " backoffLimit: 0\n" +
" completions: 1\n" +
" parallelism: 1\n" +
" template:\n" +