You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:17:41 UTC
[10/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
new file mode 100644
index 0000000..c4f8d4c
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.DynamicSequentialTask;
+import org.apache.brooklyn.core.util.task.DynamicTasks;
+import org.apache.brooklyn.core.util.task.TaskTags;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class DynamicSequentialTaskTest {
+
+ private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class);
+
+ public static final Duration TIMEOUT = Duration.TEN_SECONDS;
+ public static final Duration TINY_TIME = Duration.millis(20);
+
+ BasicExecutionManager em;
+ BasicExecutionContext ec;
+ List<String> messages;
+ Semaphore cancellations;
+ Stopwatch stopwatch;
+ Map<String,Semaphore> monitorableJobSemaphoreMap;
+ Map<String,Task<String>> monitorableTasksMap;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() {
+ em = new BasicExecutionManager("mycontext");
+ ec = new BasicExecutionContext(em);
+ cancellations = new Semaphore(0);
+ messages = new ArrayList<String>();
+ monitorableJobSemaphoreMap = MutableMap.of();
+ monitorableTasksMap = MutableMap.of();
+ monitorableTasksMap.clear();
+ stopwatch = Stopwatch.createStarted();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (em != null) em.shutdownNow();
+ }
+
+ @Test
+ public void testSimple() throws InterruptedException, ExecutionException {
+ Callable<String> mainJob = new Callable<String>() {
+ public String call() {
+ log.info("main job - "+Tasks.current());
+ messages.add("main");
+ DynamicTasks.queue( sayTask("world") );
+ return "bye";
+ }
+ };
+ DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob);
+ // this should be added before anything added when the task is invoked
+ t.queue(sayTask("hello"));
+
+ Assert.assertEquals(messages, Lists.newArrayList());
+ Assert.assertEquals(t.isBegun(), false);
+ Assert.assertEquals(Iterables.size(t.getChildren()), 1);
+
+ ec.submit(t);
+ Assert.assertEquals(t.isSubmitted(), true);
+ Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye");
+ long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc();
+ Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+
+ Time.makeTimeString(elapsed, true));
+ Assert.assertEquals(Iterables.size(t.getChildren()), 2);
+ Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages);
+ // either main or hello can be first, but world should be last
+ Assert.assertEquals(messages.get(2), "world");
+ }
+
+ public Callable<String> sayCallable(final String message, final Duration duration, final String message2) {
+ return new Callable<String>() {
+ public String call() {
+ try {
+ if (message != null) {
+ log.info("saying: "+message+ " - "+Tasks.current());
+ synchronized (messages) {
+ messages.add(message);
+ messages.notifyAll();
+ }
+ }
+ if (message2 != null) {
+ log.info("will say "+message2+" after "+duration);
+ }
+ if (duration != null && duration.toMilliseconds() > 0) {
+ Thread.sleep(duration.toMillisecondsRoundingUp());
+ }
+ } catch (InterruptedException e) {
+ cancellations.release();
+ throw Exceptions.propagate(e);
+ }
+ if (message2 != null) {
+ log.info("saying: "+message2+ " - "+Tasks.current());
+ synchronized (messages) {
+ messages.add(message2);
+ messages.notifyAll();
+ }
+ }
+ return message;
+ }
+ };
+ }
+
+ public Task<String> sayTask(String message) {
+ return sayTask(message, null, null);
+ }
+
+ public Task<String> sayTask(String message, Duration duration, String message2) {
+ return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
+ }
+
+ @Test
+ public void testComplex() throws InterruptedException, ExecutionException {
+ Task<List<?>> t = Tasks.sequential(
+ sayTask("1"),
+ sayTask("2"),
+ Tasks.parallel(sayTask("4"), sayTask("3")),
+ sayTask("5")
+ );
+ ec.submit(t);
+ Assert.assertEquals(t.get().size(), 4);
+ Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4"));
+ Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages);
+ }
+
+ @Test
+ public void testCancelled() throws InterruptedException, ExecutionException {
+ Task<List<?>> t = Tasks.sequential(
+ sayTask("1"),
+ sayTask("2a", Duration.THIRTY_SECONDS, "2b"),
+ sayTask("3"));
+ ec.submit(t);
+ synchronized (messages) {
+ while (messages.size() <= 1)
+ messages.wait();
+ }
+ Assert.assertEquals(messages, Arrays.asList("1", "2a"));
+ Time.sleep(Duration.millis(50));
+ t.cancel(true);
+ Assert.assertTrue(t.isDone());
+ // 2 should get cancelled, and invoke the cancellation semaphore
+ // 3 should get cancelled and not run at all
+ Assert.assertEquals(messages, Arrays.asList("1", "2a"));
+
+ // Need to ensure that 2 has been started; race where we might cancel it before its run method
+ // is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause.
+ Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS));
+
+ Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator();
+ Assert.assertEquals(ci.next().get(), "1");
+ Task<?> task2 = ci.next();
+ Assert.assertTrue(task2.isBegun());
+ Assert.assertTrue(task2.isDone());
+ Assert.assertTrue(task2.isCancelled());
+
+ Task<?> task3 = ci.next();
+ Assert.assertFalse(task3.isBegun());
+ Assert.assertTrue(task2.isDone());
+ Assert.assertTrue(task2.isCancelled());
+
+ // but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
+ Assert.assertEquals(cancellations.availablePermits(), 0);
+ }
+
+ protected Task<String> monitorableTask(final String id) {
+ return monitorableTask(null, id, null);
+ }
+ protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) {
+ Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build();
+ monitorableTasksMap.put(id, t);
+ return t;
+ }
+ protected Callable<String> monitorableJob(final String id) {
+ return monitorableJob(null, id, null);
+ }
+ protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) {
+ monitorableJobSemaphoreMap.put(id, new Semaphore(0));
+ return new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ if (pre!=null) pre.run();
+ // wait for semaphore
+ if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS))
+ throw new IllegalStateException("timeout for "+id);
+ synchronized (messages) {
+ messages.add(id);
+ messages.notifyAll();
+ }
+ if (post!=null) return post.call();
+ return id;
+ }
+ };
+ }
+ protected void releaseMonitorableJob(final String id) {
+ monitorableJobSemaphoreMap.get(id).release();
+ }
+ protected void waitForMessage(final String id) {
+ CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT);
+ synchronized (messages) {
+ while (!timer.isExpired()) {
+ if (messages.contains(id)) return;
+ timer.waitOnForExpiryUnchecked(messages);
+ }
+ }
+ Assert.fail("Did not see message "+id);
+ }
+ protected void releaseAndWaitForMonitorableJob(final String id) {
+ releaseMonitorableJob(id);
+ waitForMessage(id);
+ }
+
+ @Test
+ public void testChildrenRunConcurrentlyWithPrimary() {
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .add(monitorableTask("1")).add(monitorableTask("2")).build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ releaseAndWaitForMonitorableJob("main");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseMonitorableJob("2");
+
+ Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+ Assert.assertEquals(messages, MutableList.of("1", "main", "2"));
+ Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+ Assert.assertFalse(t.isError());
+ }
+
+ protected static class FailRunnable implements Runnable {
+ @Override public void run() { throw new RuntimeException("Planned exception for test"); }
+ }
+ protected static class FailCallable implements Callable<String> {
+ @Override public String call() { throw new RuntimeException("Planned exception for test"); }
+ }
+
+ @Test
+ public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() {
+ Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .add(t1).add(monitorableTask("2")).build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseMonitorableJob("main");
+
+ Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+ Assert.assertEquals(messages, MutableList.of("1", "main"));
+ Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+ Assert.assertTrue(t.isError());
+ Assert.assertTrue(t1.isError());
+ }
+
+ @Test
+ public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
+ Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseAndWaitForMonitorableJob("2");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseMonitorableJob("main");
+ Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+ Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+ Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+ Assert.assertFalse(t.isError());
+ Assert.assertTrue(t1.isError());
+ }
+
+ @Test
+ public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
+ Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+ TaskTags.markInessential(t1);
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .add(t1).add(monitorableTask("2")).build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseAndWaitForMonitorableJob("2");
+ Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+ releaseMonitorableJob("main");
+ Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+ Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+ Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+ Assert.assertFalse(t.isError());
+ Assert.assertTrue(t1.isError());
+ }
+
+ @Test
+ public void testTaskBuilderUsingAddVarargChildren() {
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .add(monitorableTask("1"), monitorableTask("2"))
+ .build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ releaseAndWaitForMonitorableJob("2");
+ releaseAndWaitForMonitorableJob("main");
+
+ Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+ }
+
+ @Test
+ public void testTaskBuilderUsingAddAllChildren() {
+ Task<String> t = Tasks.<String>builder().dynamic(true)
+ .body(monitorableJob("main"))
+ .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2")))
+ .build();
+ ec.submit(t);
+ releaseAndWaitForMonitorableJob("1");
+ releaseAndWaitForMonitorableJob("2");
+ releaseAndWaitForMonitorableJob("main");
+
+ Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
new file mode 100644
index 0000000..980a701
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ForwardingTask;
+import org.apache.brooklyn.core.util.task.TaskInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class NonBasicTaskExecutionTest {
+ private static final Logger log = LoggerFactory.getLogger(NonBasicTaskExecutionTest.class);
+
+ private static final int TIMEOUT_MS = 10*1000;
+
+ public static class ConcreteForwardingTask<T> extends ForwardingTask<T> {
+ private final TaskInternal<T> delegate;
+
+ ConcreteForwardingTask(TaskInternal<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected TaskInternal<T> delegate() {
+ return delegate;
+ }
+ }
+
+ private BasicExecutionManager em;
+ private Map<Integer,String> data;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ em = new BasicExecutionManager("mycontext");
+ data = Collections.synchronizedMap(new HashMap<Integer,String>());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (em != null) em.shutdownNow();
+ }
+
+ @Test
+ public void runSimpleTask() throws Exception {
+ TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
+ @Override public Object call() {
+ return data.put(1, "b");
+ }}));
+ data.put(1, "a");
+ Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+ assertEquals("a", t.get());
+ assertEquals("a", t2.get());
+ assertSame(t, t2, "t="+t+"; t2="+t2);
+ assertEquals("b", data.get(1));
+ }
+
+ @Test
+ public void runBasicTaskWithWaits() throws Exception {
+ final CountDownLatch signalStarted = new CountDownLatch(1);
+ final CountDownLatch allowCompletion = new CountDownLatch(1);
+ final TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Object result = data.put(1, "b");
+ signalStarted.countDown();
+ assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ return result;
+ }}));
+ data.put(1, "a");
+
+ Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+ assertEquals(t, t2);
+ assertFalse(t.isDone());
+
+ assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ assertEquals("b", data.get(1));
+ assertFalse(t.isDone());
+
+ log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false));
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ t.getStatusDetail(false).toLowerCase().contains("waiting");
+ }});
+ // "details="+t.getStatusDetail(false))
+
+ allowCompletion.countDown();
+ assertEquals("a", t.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
new file mode 100644
index 0000000..3b338da
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeInterruptedException;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings({"unchecked","rawtypes"})
+public class ScheduledExecutionTest {
+
+ public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
+
+ @Test
+ public void testScheduledTask() throws Exception {
+ int PERIOD = 20;
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger i = new AtomicInteger(0);
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD, "maxIterations", 5), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() {
+ log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ return i.incrementAndGet();
+ }});
+ }});
+
+ log.info("submitting {} {}", t, t.getStatusDetail(false));
+ m.submit(t);
+ log.info("submitted {} {}", t, t.getStatusDetail(false));
+ Integer interimResult = (Integer) t.get();
+ log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ assertTrue(i.get() > 0, "i="+i);
+ t.blockUntilEnded();
+ Integer finalResult = (Integer) t.get();
+ log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
+ assertEquals(finalResult, (Integer)5);
+ assertEquals(i.get(), 5);
+ }
+
+ /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
+ @Test
+ public void testScheduledTaskSelfEnding() throws Exception {
+ int PERIOD = 20;
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger i = new AtomicInteger(0);
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() {
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+ if (i.get() >= 4) submitter.period = null;
+ log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ return i.incrementAndGet();
+ }});
+ }});
+
+ log.info("submitting {} {}", t, t.getStatusDetail(false));
+ m.submit(t);
+ log.info("submitted {} {}", t, t.getStatusDetail(false));
+ Integer interimResult = (Integer) t.get();
+ log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ assertTrue(i.get() > 0);
+ t.blockUntilEnded();
+ Integer finalResult = (Integer) t.get();
+ log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
+ assertEquals(finalResult, (Integer)5);
+ assertEquals(i.get(), 5);
+ }
+
+ @Test
+ public void testScheduledTaskCancelEnding() throws Exception {
+ Duration PERIOD = Duration.millis(20);
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger i = new AtomicInteger();
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() {
+ log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+ i.incrementAndGet();
+ if (i.get() >= 5) submitter.cancel();
+ return i.get();
+ }});
+ }});
+
+ log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ m.submit(t);
+ log.info("submitted {} {}", t, t.getStatusDetail(false));
+ Integer interimResult = (Integer) t.get();
+ log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ assertTrue(i.get() > 0);
+ t.blockUntilEnded();
+// int finalResult = t.get()
+ log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
+// assertEquals(finalResult, 5)
+ assertEquals(i.get(), 5);
+ }
+
+ @Test(groups="Integration")
+ public void testScheduledTaskCancelOuter() throws Exception {
+ final Duration PERIOD = Duration.millis(20);
+ final Duration CYCLE_DELAY = Duration.ONE_SECOND;
+ // this should be enough to start the next cycle, but not so much that the cycle ends;
+ // and enough that when a task is interrupted it terminates within this period
+ final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
+
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final AtomicInteger i = new AtomicInteger();
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() {
+ log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ Time.sleep(CYCLE_DELAY);
+ i.incrementAndGet();
+ return i.get();
+ }});
+ }});
+
+ log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ m.submit(t);
+ log.info("submitted {} {}", t, t.getStatusDetail(false));
+ Integer interimResult = (Integer) t.get();
+ log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ assertEquals(i.get(), 1);
+
+ Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+ assertEquals(t.get(), 2);
+
+ Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+ Stopwatch timer = Stopwatch.createUnstarted();
+ t.cancel(true);
+ t.blockUntilEnded();
+// int finalResult = t.get()
+ log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ try {
+ t.get();
+ Assert.fail("Should have failed getting result of cancelled "+t);
+ } catch (Exception e) {
+ /* expected */
+ }
+ assertEquals(i.get(), 2);
+ log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+ }
+
+ @Test(groups="Integration")
+ public void testScheduledTaskCancelInterrupts() throws Exception {
+ final Duration PERIOD = Duration.millis(20);
+ final Duration CYCLE_DELAY = Duration.ONE_SECOND;
+ // this should be enough to start the next cycle, but not so much that the cycle ends;
+ // and enough that when a task is interrupted it terminates within this period
+ final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
+
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final Semaphore interruptedSemaphore = new Semaphore(0);
+ final AtomicInteger i = new AtomicInteger();
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() {
+ try {
+ log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+ Time.sleep(CYCLE_DELAY);
+ i.incrementAndGet();
+ return i.get();
+ } catch (RuntimeInterruptedException e) {
+ interruptedSemaphore.release();
+ throw Exceptions.propagate(e);
+ }
+ }});
+ }});
+
+ log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+ m.submit(t);
+ log.info("submitted {} {}", t, t.getStatusDetail(false));
+ Integer interimResult = (Integer) t.get();
+ log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+ assertEquals(i.get(), 1);
+
+ Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+ assertEquals(t.get(), 2);
+
+ Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+ Stopwatch timer = Stopwatch.createUnstarted();
+ t.cancel(true);
+ t.blockUntilEnded();
+// int finalResult = t.get()
+ log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ try {
+ t.get();
+ Assert.fail("Should have failed getting result of cancelled "+t);
+ } catch (Exception e) {
+ /* expected */
+ }
+ assertEquals(i.get(), 2);
+ Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted");
+ log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+ Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+ }
+
+ @Test(groups="Integration")
+ public void testScheduledTaskTakesLongerThanPeriod() throws Exception {
+ final int PERIOD = 1;
+ final int SLEEP_TIME = 100;
+ final int EARLY_RETURN_GRACE = 10;
+ BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+ final List<Long> execTimes = new CopyOnWriteArrayList<Long>();
+
+ ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+ public Task<?> call() throws Exception {
+ return new BasicTask<Void>(new Runnable() {
+ public void run() {
+ execTimes.add(System.currentTimeMillis());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }});
+ }});
+
+ m.submit(t);
+
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ assertTrue(execTimes.size() > 3, "size="+execTimes.size());
+ }});
+
+ List<Long> timeDiffs = Lists.newArrayList();
+ long prevExecTime = -1;
+ for (Long execTime : execTimes) {
+ if (prevExecTime == -1) {
+ prevExecTime = execTime;
+ } else {
+ timeDiffs.add(execTime - prevExecTime);
+ prevExecTime = execTime;
+ }
+ }
+
+ for (Long timeDiff : timeDiffs) {
+ if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
new file mode 100644
index 0000000..e3420c8
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.SingleThreadedScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.util.concurrent.Callables;
+
+public class SingleThreadedSchedulerTest {
+
+ private static final Logger log = LoggerFactory.getLogger(SingleThreadedSchedulerTest.class);
+
+ private BasicExecutionManager em;
+
+ @BeforeMethod
+ public void setUp() {
+ em = new BasicExecutionManager("mycontextid");
+ em.setTaskSchedulerForTag("category1", SingleThreadedScheduler.class);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (em != null) em.shutdownNow();
+ }
+
+ @Test
+ public void testExecutesInOrder() throws Exception {
+ final int NUM_TIMES = 1000;
+ final List<Integer> result = new CopyOnWriteArrayList<Integer>();
+ for (int i = 0; i < NUM_TIMES; i++) {
+ final int counter = i;
+ em.submit(MutableMap.of("tag", "category1"), new Runnable() {
+ public void run() {
+ result.add(counter);
+ }});
+ }
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(result.size(), NUM_TIMES);
+ }});
+
+ for (int i = 0; i < NUM_TIMES; i++) {
+ assertEquals(result.get(i), (Integer)i);
+ }
+ }
+
+ @Test
+ public void testLargeQueueDoesNotConsumeTooManyThreads() throws Exception {
+ final int NUM_TIMES = 3000;
+ final CountDownLatch latch = new CountDownLatch(1);
+ BasicTask<Void> blockingTask = new BasicTask<Void>(newLatchAwaiter(latch));
+ em.submit(MutableMap.of("tag", "category1"), blockingTask);
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ for (int i = 0; i < NUM_TIMES; i++) {
+ BasicTask<Void> t = new BasicTask<Void>(new Runnable() {
+ public void run() {
+ counter.incrementAndGet();
+ }});
+ em.submit(MutableMap.of("tag", "category1"), t);
+ if (i % 500 == 0) log.info("Submitted "+i+" jobs...");
+ }
+
+ Thread.sleep(100); // give it more of a chance to create the threads before we let them execute
+ latch.countDown();
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(counter.get(), NUM_TIMES);
+ }});
+ }
+
+ @Test
+ public void testGetResultOfQueuedTaskBeforeItExecutes() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+
+ BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+ Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ latch.countDown();
+ }});
+ thread.start();
+ assertEquals(future.get(), (Integer)123);
+ }
+
+ @Test
+ public void testGetResultOfQueuedTaskBeforeItExecutesWithTimeout() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+
+ BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+ Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+ try {
+ assertEquals(future.get(10, TimeUnit.MILLISECONDS), (Integer)123);
+ fail();
+ } catch (TimeoutException e) {
+ // success
+ }
+ }
+
+ @Test
+ public void testCancelQueuedTaskBeforeItExecutes() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+
+ final AtomicBoolean executed = new AtomicBoolean();
+ BasicTask<?> t = new BasicTask<Void>(new Runnable() {
+ public void run() {
+ executed.set(true);
+ }});
+ Future<?> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+ future.cancel(true);
+ latch.countDown();
+ Thread.sleep(10);
+ try {
+ future.get();
+ } catch (CancellationException e) {
+ // success
+ }
+ assertFalse(executed.get());
+ }
+
+ @Test
+ public void testGetResultOfQueuedTaskAfterItExecutes() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+
+ BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+ Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+ latch.countDown();
+ assertEquals(future.get(), (Integer)123);
+ }
+
+ private Callable<Void> newLatchAwaiter(final CountDownLatch latch) {
+ return new Callable<Void>() {
+ public Void call() throws Exception {
+ latch.await();
+ return null;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
new file mode 100644
index 0000000..1ff181b
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+
+public class TaskFinalizationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(TaskFinalizationTest.class);
+
+ // integration because it can take a while (and finalizers aren't even guaranteed)
+ @Test(groups="Integration")
+ public void testFinalizerInvoked() throws InterruptedException {
+ BasicTask<?> t = new BasicTask<Void>(new Runnable() { public void run() { /* no op */ }});
+ final Semaphore x = new Semaphore(0);
+ t.setFinalizer(new BasicTask.TaskFinalizer() {
+ public void onTaskFinalization(Task<?> t) {
+ synchronized (x) {
+ x.release();
+ }
+ }
+ });
+ t = null;
+ Stopwatch watch = Stopwatch.createStarted();
+ for (int i=0; i<30; i++) {
+ System.gc(); System.gc();
+ if (x.tryAcquire(1, TimeUnit.SECONDS)) {
+ log.info("finalizer ran after "+Time.makeTimeStringRounded(watch));
+ return;
+ }
+ }
+ Assert.fail("finalizer did not run in time");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
new file mode 100644
index 0000000..0800984
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.TaskInternal;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.ValueResolver;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.basic.EntityFunctions;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
+
+
+public class TasksTest extends BrooklynAppUnitTestSupport {
+
+ private ExecutionContext executionContext;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ executionContext = app.getExecutionContext();
+ }
+
+ @Test
+ public void testResolveNull() throws Exception {
+ assertResolvesValue(null, String.class, null);
+ }
+
+ @Test
+ public void testResolveValueCastsToType() throws Exception {
+ assertResolvesValue(123, String.class, "123");
+ }
+
+ @Test
+ public void testResolvesAttributeWhenReady() throws Exception {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+ assertResolvesValue(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE), String.class, "myval");
+ }
+
+ @Test
+ public void testResolvesMapWithAttributeWhenReady() throws Exception {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+ Map<?,?> orig = ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
+ Map<?,?> expected = ImmutableMap.of("mykey", "myval");
+ assertResolvesValue(orig, String.class, expected);
+ }
+
+ @Test
+ public void testResolvesSetWithAttributeWhenReady() throws Exception {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+ Set<?> orig = ImmutableSet.of(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
+ Set<?> expected = ImmutableSet.of("myval");
+ assertResolvesValue(orig, String.class, expected);
+ }
+
+ @Test
+ public void testResolvesMapOfMapsWithAttributeWhenReady() throws Exception {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+ Map<?,?> orig = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)));
+ Map<?,?> expected = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", "myval"));
+ assertResolvesValue(orig, String.class, expected);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResolvesIterableOfMapsWithAttributeWhenReady() throws Exception {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+ // using Iterables.concat so that orig is of type FluentIterable rather than List etc
+ Iterable<?> orig = Iterables.concat(ImmutableList.of(ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))));
+ Iterable<Map<?,?>> expected = ImmutableList.<Map<?,?>>of(ImmutableMap.of("mykey", "myval"));
+ assertResolvesValue(orig, String.class, expected);
+ }
+
+ private void assertResolvesValue(Object actual, Class<?> type, Object expected) throws Exception {
+ Object result = Tasks.resolveValue(actual, type, executionContext);
+ assertEquals(result, expected);
+ }
+
+ @Test
+ public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception {
+ app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO));
+ Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build();
+ ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext());
+
+ ValueResolverTest.assertThrowsOnMaybe(v);
+ ValueResolverTest.assertThrowsOnGet(v);
+
+ v.swallowExceptions();
+ ValueResolverTest.assertMaybeIsAbsent(v);
+ ValueResolverTest.assertThrowsOnGet(v);
+
+ v.defaultValue("foo");
+ ValueResolverTest.assertMaybeIsAbsent(v);
+ assertEquals(v.clone().get(), "foo");
+ assertResolvesValue(v, Object.class, "foo");
+ }
+
+ @Test
+ public void testRepeater() throws Exception {
+ Task<?> t;
+
+ t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+ app.getExecutionContext().submit(t);
+ t.get(Duration.TEN_SECONDS);
+
+ t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+ app.getExecutionContext().submit(t);
+ Assert.assertEquals(t.get(Duration.TEN_SECONDS), true);
+
+ t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+ app.getExecutionContext().submit(t);
+ try {
+ t.get(Duration.TEN_SECONDS);
+ Assert.fail("Should have failed");
+ } catch (Exception e) {
+ // expected
+ }
+
+ t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+ app.getExecutionContext().submit(t);
+ Assert.assertEquals(t.get(Duration.TEN_SECONDS), false);
+ }
+
+ @Test
+ public void testRepeaterDescription() throws Exception{
+ final String description = "task description";
+ Repeater repeater = Repeater.create(description)
+ .repeat(Callables.returning(null))
+ .every(Duration.ONE_MILLISECOND)
+ .limitIterationsTo(1)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ TaskInternal<?> current = (TaskInternal<?>)Tasks.current();
+ assertEquals(current.getBlockingDetails(), description);
+ return true;
+ }
+ });
+ Task<Boolean> t = Tasks.testing(repeater).build();
+ app.getExecutionContext().submit(t);
+ assertTrue(t.get(Duration.TEN_SECONDS));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
new file mode 100644
index 0000000..9f65bc4
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.ValueResolver;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+/**
+ * see also {@link TasksTest} for more tests
+ */
+@Test
+public class ValueResolverTest extends BrooklynAppUnitTestSupport {
+
+ private ExecutionContext executionContext;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ executionContext = app.getExecutionContext();
+ }
+
+ public static final Task<String> newSleepTask(final Duration timeout, final String result) {
+ return Tasks.<String>builder().body(new Callable<String>() {
+ public String call() {
+ Time.sleep(timeout);
+ return result;
+ }}
+ ).build();
+ }
+
+ public static final Task<String> newThrowTask(final Duration timeout) {
+ return Tasks.<String>builder().body(new Callable<String>() {
+ public String call() {
+ Time.sleep(timeout);
+ throw new IllegalStateException("intended, during tests");
+ }}
+ ).build();
+ }
+
+ public void testTimeoutZero() {
+ Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe();
+ Assert.assertFalse(result.isPresent());
+ }
+
+ public void testTimeoutBig() {
+ Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(executionContext).timeout(Duration.TEN_SECONDS).getMaybe();
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testNoExecutionContextOnCompleted() {
+ Task<String> t = newSleepTask(Duration.ZERO, "foo");
+ executionContext.submit(t).getUnchecked();
+ Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) {
+ try {
+ result = result.clone();
+ result.getMaybe();
+ Assert.fail("should have thrown");
+ return null;
+ } catch (Exception e) { return e; }
+ }
+ public static Throwable assertThrowsOnGet(ValueResolver<?> result) {
+ result = result.clone();
+ try {
+ result.get();
+ Assert.fail("should have thrown");
+ return null;
+ } catch (Exception e) { return e; }
+ }
+ public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) {
+ result = result.clone();
+ Maybe<T> maybe = result.getMaybe();
+ Assert.assertFalse(maybe.isPresent());
+ return maybe;
+ }
+
+ public void testSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions();
+ assertMaybeIsAbsent(result);
+ assertThrowsOnGet(result);
+ }
+
+
+ public void testDontSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext);
+ assertThrowsOnMaybe(result);
+ assertThrowsOnGet(result);
+ }
+
+ public void testDefaultWhenSwallowError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo");
+ assertMaybeIsAbsent(result);
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+ public void testDefaultBeforeDelayAndError() {
+ ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo");
+ assertMaybeIsAbsent(result);
+ Assert.assertEquals(result.get(), "foo");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
new file mode 100644
index 0000000..94fe3e6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.management.internal.LocalManagementContext;
+import org.apache.brooklyn.core.util.ssh.BashCommandsIntegrationTest;
+import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskFactory;
+import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskWrapper;
+import org.apache.brooklyn.core.util.task.ssh.SshPutTaskFactory;
+import org.apache.brooklyn.core.util.task.ssh.SshPutTaskWrapper;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.ssh.SshTasksTest;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.Entities;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
+
+/**
+ * Some tests for {@link SshTasks}. Note more tests in {@link BashCommandsIntegrationTest},
+ * {@link SshEffectorTasksTest}, and {@link SoftwareEffectorTest}.
+ */
+public class SshTasksTest {
+
+ private static final Logger log = LoggerFactory.getLogger(SshTasksTest.class);
+
+ ManagementContext mgmt;
+ SshMachineLocation host;
+ File tempDir;
+
+ boolean failureExpected;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setup() throws Exception {
+ mgmt = new LocalManagementContext();
+
+ LocalhostMachineProvisioningLocation lhc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+ host = lhc.obtain();
+ clearExpectedFailure();
+ tempDir = Os.newTempDir(getClass());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (mgmt != null) Entities.destroyAll(mgmt);
+ mgmt = null;
+ tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing();
+ checkExpectedFailure();
+ }
+
+ protected void checkExpectedFailure() {
+ if (failureExpected) {
+ clearExpectedFailure();
+ Assert.fail("Test should have thrown an exception but it did not.");
+ }
+ }
+
+ protected void clearExpectedFailure() {
+ failureExpected = false;
+ }
+
+ protected void setExpectingFailure() {
+ failureExpected = true;
+ }
+
+
+ protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) {
+ tf.machine(host);
+ ProcessTaskWrapper<T> t = tf.newTask();
+ mgmt.getExecutionManager().submit(t);
+ return t;
+ }
+
+ protected SshPutTaskWrapper submit(final SshPutTaskFactory tf) {
+ SshPutTaskWrapper t = tf.newTask();
+ mgmt.getExecutionManager().submit(t);
+ return t;
+ }
+
+ @Test(groups="Integration")
+ public void testSshEchoHello() {
+ ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "sleep 1 ; echo hello world"));
+ Assert.assertFalse(t.isDone());
+ Assert.assertEquals(t.get(), (Integer)0);
+ Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0);
+ Assert.assertEquals(t.getStdout().trim(), "hello world");
+ }
+
+ @Test(groups="Integration")
+ public void testCopyTo() throws IOException {
+ String fn = Urls.mergePaths(tempDir.getPath(), "f1");
+ SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
+ t.block();
+ Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
+ // and make sure this doesn't throw
+ Assert.assertTrue(t.isDone());
+ Assert.assertTrue(t.isSuccessful());
+ Assert.assertEquals(t.get(), null);
+ Assert.assertEquals(t.getExitCode(), (Integer)0);
+ }
+
+ @Test(groups="Integration")
+ public void testCopyToFailBadSubdir() throws IOException {
+ String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
+ SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
+ //this doesn't fail
+ t.block();
+ Assert.assertTrue(t.isDone());
+ setExpectingFailure();
+ try {
+ // but this does
+ t.get();
+ } catch (Exception e) {
+ log.info("The error if file cannot be written is: "+e);
+ clearExpectedFailure();
+ }
+ checkExpectedFailure();
+ // and the results indicate failure
+ Assert.assertFalse(t.isSuccessful());
+ Assert.assertNotNull(t.getException());
+ Assert.assertNotEquals(t.getExitCode(), (Integer)0);
+ }
+
+ @Test(groups="Integration")
+ public void testCopyToFailBadSubdirAllow() throws IOException {
+ String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
+ SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").allowFailure());
+ //this doesn't fail
+ t.block();
+ Assert.assertTrue(t.isDone());
+ // and this doesn't fail either
+ Assert.assertEquals(t.get(), null);
+ // but it's not successful
+ Assert.assertNotNull(t.getException());
+ Assert.assertFalse(t.isSuccessful());
+ // exit code probably null, but won't be zero
+ Assert.assertNotEquals(t.getExitCode(), (Integer)0);
+ }
+
+ @Test(groups="Integration")
+ public void testCopyToFailBadSubdirCreate() throws IOException {
+ String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir-to-create/file");
+ SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").createDirectory());
+ t.block();
+ // directory should be created, and file readable now
+ Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
+ Assert.assertEquals(t.getExitCode(), (Integer)0);
+ }
+
+ @Test(groups="Integration")
+ public void testSshFetch() throws IOException {
+ String fn = Urls.mergePaths(tempDir.getPath(), "f2");
+ FileUtils.write(new File(fn), "hello fetched world");
+
+ SshFetchTaskFactory tf = SshTasks.newSshFetchTaskFactory(host, fn);
+ SshFetchTaskWrapper t = tf.newTask();
+ mgmt.getExecutionManager().submit(t);
+
+ t.block();
+ Assert.assertTrue(t.isDone());
+ Assert.assertEquals(t.get(), "hello fetched world");
+ Assert.assertEquals(t.getBytes(), "hello fetched world".getBytes());
+ }
+
+ @Test(groups="Integration")
+ public void testSshWithHeaderProperty() {
+ host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
+ ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "echo bar"));
+ Assert.assertTrue(t.block().getStdout().trim().matches("foo\\s+bar"), "mismatched output was: "+t.getStdout());
+ }
+
+ @Test(groups="Integration")
+ public void testSshIgnoringHeaderProperty() {
+ host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
+ ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, false, "echo bar"));
+ Assert.assertTrue(t.block().getStdout().trim().matches("bar"), "mismatched output was: "+t.getStdout());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
new file mode 100644
index 0000000..b673056
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import java.io.File;
+
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.management.internal.LocalManagementContext;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.core.util.task.system.SystemTasks;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.util.os.Os;
+
+/**
+ * Some tests for {@link SystemTasks}. See {@link SshTasks}.
+ */
+public class SystemTasksTest {
+
+ ManagementContext mgmt;
+ File tempDir;
+
+ boolean failureExpected;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setup() throws Exception {
+ mgmt = new LocalManagementContext();
+
+ clearExpectedFailure();
+ tempDir = Os.newTempDir(getClass());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (mgmt != null) Entities.destroyAll(mgmt);
+ mgmt = null;
+ tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing();
+ checkExpectedFailure();
+ }
+
+ protected void checkExpectedFailure() {
+ if (failureExpected) {
+ clearExpectedFailure();
+ Assert.fail("Test should have thrown an exception but it did not.");
+ }
+ }
+
+ protected void clearExpectedFailure() {
+ failureExpected = false;
+ }
+
+ protected void setExpectingFailure() {
+ failureExpected = true;
+ }
+
+
+ protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) {
+ ProcessTaskWrapper<T> t = tf.newTask();
+ mgmt.getExecutionManager().submit(t);
+ return t;
+ }
+
+ @Test(groups="Integration")
+ public void testExecEchoHello() {
+ ProcessTaskWrapper<Integer> t = submit(SystemTasks.exec("sleep 1 ; echo hello world"));
+ Assert.assertFalse(t.isDone());
+ Assert.assertEquals(t.get(), (Integer)0);
+ Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0);
+ Assert.assertEquals(t.getStdout().trim(), "hello world");
+ }
+
+ // FIXME Behaviour of Bash shell changes from 3.x to 4.x so test is disabled
+ @Test(groups="Integration", enabled=false)
+ public void testSubshellExitScriptDoesNotExit() {
+ checkSubshellExitDoesNotExit(taskSubshellExit().runAsScript());
+ }
+
+ @Test(groups="Integration")
+ public void testSubshellExitCommandDoesNotExit() {
+ checkSubshellExitDoesNotExit(taskSubshellExit().runAsCommand());
+ }
+
+ public ProcessTaskFactory<Integer> taskSubshellExit() {
+ return SystemTasks.exec("echo hello", "( exit 1 )", "echo bye code $?");
+ }
+
+ public void checkSubshellExitDoesNotExit(ProcessTaskFactory<Integer> task) {
+ ProcessTaskWrapper<Integer> t = submit(task);
+ t.block();
+ Assert.assertEquals(t.get(), (Integer)0);
+ Assert.assertTrue(t.getStdout().contains("bye code 1"), "stdout is: "+t.getStdout());
+ }
+
+ @Test(groups="Integration")
+ public void testGroupExitScriptDoesNotExit() {
+ checkGroupExitDoesExit(taskGroupExit().runAsScript());
+ }
+
+ @Test(groups="Integration")
+ public void testGroupExitCommandDoesNotExit() {
+ checkGroupExitDoesExit(taskGroupExit().runAsCommand());
+ }
+
+ public ProcessTaskFactory<Integer> taskGroupExit() {
+ return SystemTasks.exec("echo hello", "{ exit 1 ; }", "echo bye code $?");
+ }
+
+ public void checkGroupExitDoesExit(ProcessTaskFactory<Integer> task) {
+ ProcessTaskWrapper<Integer> t = submit(task);
+ t.block();
+ Assert.assertEquals(t.get(), (Integer)1);
+ Assert.assertFalse(t.getStdout().contains("bye"), "stdout is: "+t.getStdout());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
new file mode 100644
index 0000000..73794a3
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.text;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import org.apache.brooklyn.core.util.text.DataUriSchemeParser;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class DataUriSchemeParserTest {
+
+ @Test
+ public void testSimple() {
+ Assert.assertEquals(new DataUriSchemeParser("data:,hello").parse().getDataAsString(), "hello");
+ Assert.assertEquals(DataUriSchemeParser.toString("data:,hello"), "hello");
+ }
+
+ @Test
+ public void testMimeType() throws UnsupportedEncodingException {
+ DataUriSchemeParser p = new DataUriSchemeParser("data:application/json,"+URLEncoder.encode("{ }", "US-ASCII")).parse();
+ Assert.assertEquals(p.getMimeType(), "application/json");
+ Assert.assertEquals(p.getData(), "{ }".getBytes());
+ }
+
+ @Test
+ public void testBase64() {
+ Assert.assertEquals(DataUriSchemeParser.toString(
+ "data:;base64,"+new String(Base64.encode("hello".getBytes()))),
+ "hello");
+ }
+
+ // TODO test pictures, etc
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
new file mode 100644
index 0000000..05f4fde
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.text;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.util.text.TemplateProcessor;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.test.FixedLocaleTest;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TemplateProcessorTest extends BrooklynAppUnitTestSupport {
+ private FixedLocaleTest localeFix = new FixedLocaleTest();
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+ localeFix.setUp();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ super.tearDown();
+ localeFix.tearDown();
+ }
+
+ @Test
+ public void testAdditionalArgs() {
+ String templateContents = "${mykey}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.of("mykey", "myval"));
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testEntityConfig() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .configure(TestEntity.CONF_NAME, "myval"));
+ String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testEntityConfigNumber() {
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .configure(TestEntity.CONF_OBJECT, 123456));
+ String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+ assertEquals(result, "123,456");
+ }
+
+ @Test
+ public void testEntityConfigNumberUnadorned() {
+ // ?c is needed to avoid commas (i always forget this!)
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .configure(TestEntity.CONF_OBJECT, 123456));
+ String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']?c}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+ assertEquals(result, "123456");
+ }
+
+ @Test
+ public void testGetSysProp() {
+ System.setProperty("testGetSysProp", "myval");
+
+ String templateContents = "${javaSysProps['testGetSysProp']}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testEntityGetterMethod() {
+ String templateContents = "${entity.id}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, app.getId());
+ }
+
+ @Test
+ public void testManagementContextConfig() {
+ mgmt.getBrooklynProperties().put("globalmykey", "myval");
+ String templateContents = "${mgmt.globalmykey}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testManagementContextDefaultValue() {
+ String templateContents = "${(missing)!\"defval\"}";
+ Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, "defval");
+ }
+
+ @Test
+ public void testManagementContextDefaultValueInDotMissingValue() {
+ String templateContents = "${(mgmt.missing.more_missing)!\"defval\"}";
+ Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, "defval");
+ }
+
+ @Test
+ public void testManagementContextConfigWithDot() {
+ mgmt.getBrooklynProperties().put("global.mykey", "myval");
+ String templateContents = "${mgmt['global.mykey']}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testManagementContextErrors() {
+ try {
+ // NB: dot has special meaning so this should fail; must be accessed using bracket notation as above
+ mgmt.getBrooklynProperties().put("global.mykey", "myval");
+ String templateContents = "${mgmt.global.mykey}";
+ TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+ Assert.fail("Should not have found value with intermediate dot");
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().contains("global"), "Should have mentioned missing key 'global' in error");
+ }
+ }
+
+ @Test
+ public void testApplyTemplatedConfigWithAttributeWhenReady() {
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+ .configure(TestEntity.CONF_NAME, DependentConfiguration.attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)));
+
+ String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testDotSeparatedKey() {
+ String templateContents = "${a.b}";
+ String result = TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null,
+ ImmutableMap.<String,Object>of("a.b", "myval"));
+ assertEquals(result, "myval");
+ }
+
+ @Test
+ public void testDotSeparatedKeyCollisionFailure() {
+ String templateContents = "${aaa.bbb}";
+ try {
+ TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null,
+ ImmutableMap.<String,Object>of("aaa.bbb", "myval", "aaa", "blocker"));
+ Assert.fail("Should not have found value with intermediate dot where prefix is overridden");
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().contains("aaa"), "Should have mentioned missing key 'aaa' in error");
+ }
+ }
+
+}