You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:17:41 UTC

[10/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
new file mode 100644
index 0000000..c4f8d4c
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.DynamicSequentialTask;
+import org.apache.brooklyn.core.util.task.DynamicTasks;
+import org.apache.brooklyn.core.util.task.TaskTags;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.CountdownTimer;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class DynamicSequentialTaskTest {
+
+    private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class);
+    
+    public static final Duration TIMEOUT = Duration.TEN_SECONDS;
+    public static final Duration TINY_TIME = Duration.millis(20);
+    
+    BasicExecutionManager em;
+    BasicExecutionContext ec;
+    List<String> messages;
+    Semaphore cancellations;
+    Stopwatch stopwatch;
+    Map<String,Semaphore> monitorableJobSemaphoreMap;
+    Map<String,Task<String>> monitorableTasksMap;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() {
+        em = new BasicExecutionManager("mycontext");
+        ec = new BasicExecutionContext(em);
+        cancellations = new Semaphore(0);
+        messages = new ArrayList<String>();
+        monitorableJobSemaphoreMap = MutableMap.of();
+        monitorableTasksMap = MutableMap.of();
+        monitorableTasksMap.clear();
+        stopwatch = Stopwatch.createStarted();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (em != null) em.shutdownNow();
+    }
+
+    @Test
+    public void testSimple() throws InterruptedException, ExecutionException {
+        Callable<String> mainJob = new Callable<String>() {
+            public String call() {
+                log.info("main job - "+Tasks.current());
+                messages.add("main");
+                DynamicTasks.queue( sayTask("world") );
+                return "bye";
+            }            
+        };
+        DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob);
+        // this should be added before anything added when the task is invoked
+        t.queue(sayTask("hello"));
+        
+        Assert.assertEquals(messages, Lists.newArrayList());
+        Assert.assertEquals(t.isBegun(), false);
+        Assert.assertEquals(Iterables.size(t.getChildren()), 1);
+        
+        ec.submit(t);
+        Assert.assertEquals(t.isSubmitted(), true);
+        Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye");
+        long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc();
+        Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+
+                Time.makeTimeString(elapsed, true));
+        Assert.assertEquals(Iterables.size(t.getChildren()), 2);
+        Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages);
+        // either main or hello can be first, but world should be last 
+        Assert.assertEquals(messages.get(2), "world");
+    }
+    
+    public Callable<String> sayCallable(final String message, final Duration duration, final String message2) {
+        return new Callable<String>() {
+            public String call() {
+                try {
+                    if (message != null) {
+                        log.info("saying: "+message+ " - "+Tasks.current());
+                        synchronized (messages) {
+                            messages.add(message);
+                            messages.notifyAll();
+                        }
+                    }
+                    if (message2 != null) {
+                        log.info("will say "+message2+" after "+duration);
+                    }
+                    if (duration != null && duration.toMilliseconds() > 0) {
+                        Thread.sleep(duration.toMillisecondsRoundingUp());
+                    }
+                } catch (InterruptedException e) {
+                    cancellations.release();
+                    throw Exceptions.propagate(e);
+                }
+                if (message2 != null) {
+                    log.info("saying: "+message2+ " - "+Tasks.current());
+                    synchronized (messages) {
+                        messages.add(message2);
+                        messages.notifyAll();
+                    }
+                }
+                return message;
+            }            
+        };
+    }
+    
+    public Task<String> sayTask(String message) {
+        return sayTask(message, null, null);
+    }
+    
+    public Task<String> sayTask(String message, Duration duration, String message2) {
+        return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
+    }
+    
+    @Test
+    public void testComplex() throws InterruptedException, ExecutionException {
+        Task<List<?>> t = Tasks.sequential(
+                sayTask("1"),
+                sayTask("2"),
+                Tasks.parallel(sayTask("4"), sayTask("3")),
+                sayTask("5")
+            );
+        ec.submit(t);
+        Assert.assertEquals(t.get().size(), 4); 
+        Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4"));
+        Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages);
+    }
+    
+    @Test
+    public void testCancelled() throws InterruptedException, ExecutionException {
+        Task<List<?>> t = Tasks.sequential(
+                sayTask("1"),
+                sayTask("2a", Duration.THIRTY_SECONDS, "2b"),
+                sayTask("3"));
+        ec.submit(t);
+        synchronized (messages) {
+            while (messages.size() <= 1)
+                messages.wait();
+        }
+        Assert.assertEquals(messages, Arrays.asList("1", "2a"));
+        Time.sleep(Duration.millis(50));
+        t.cancel(true);
+        Assert.assertTrue(t.isDone());
+        // 2 should get cancelled, and invoke the cancellation semaphore
+        // 3 should get cancelled and not run at all
+        Assert.assertEquals(messages, Arrays.asList("1", "2a"));
+        
+        // Need to ensure that 2 has been started; race where we might cancel it before its run method
+        // is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause.
+        Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS));
+        
+        Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator();
+        Assert.assertEquals(ci.next().get(), "1");
+        Task<?> task2 = ci.next();
+        Assert.assertTrue(task2.isBegun());
+        Assert.assertTrue(task2.isDone());
+        Assert.assertTrue(task2.isCancelled());
+        
+        Task<?> task3 = ci.next();
+        Assert.assertFalse(task3.isBegun());
+        Assert.assertTrue(task2.isDone());
+        Assert.assertTrue(task2.isCancelled());
+        
+        // but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
+        Assert.assertEquals(cancellations.availablePermits(), 0);
+    }
+
+    protected Task<String> monitorableTask(final String id) {
+        return monitorableTask(null, id, null);
+    }
+    protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) {
+        Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build();
+        monitorableTasksMap.put(id, t);
+        return t;
+    }
+    protected Callable<String> monitorableJob(final String id) {
+        return monitorableJob(null, id, null);
+    }
+    protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) {
+        monitorableJobSemaphoreMap.put(id, new Semaphore(0));
+        return new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                if (pre!=null) pre.run();
+                // wait for semaphore
+                if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS))
+                    throw new IllegalStateException("timeout for "+id);
+                synchronized (messages) {
+                    messages.add(id);
+                    messages.notifyAll();
+                }
+                if (post!=null) return post.call();
+                return id;
+            }
+        };
+    }
+    protected void releaseMonitorableJob(final String id) {
+        monitorableJobSemaphoreMap.get(id).release();
+    }
+    protected void waitForMessage(final String id) {
+        CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT);
+        synchronized (messages) {
+            while (!timer.isExpired()) {
+                if (messages.contains(id)) return;
+                timer.waitOnForExpiryUnchecked(messages);
+            }
+        }
+        Assert.fail("Did not see message "+id);
+    }
+    protected void releaseAndWaitForMonitorableJob(final String id) {
+        releaseMonitorableJob(id);
+        waitForMessage(id);
+    }
+    
+    @Test
+    public void testChildrenRunConcurrentlyWithPrimary() {
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .add(monitorableTask("1")).add(monitorableTask("2")).build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        releaseAndWaitForMonitorableJob("main");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseMonitorableJob("2");
+        
+        Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+        Assert.assertEquals(messages, MutableList.of("1", "main", "2"));
+        Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+        Assert.assertFalse(t.isError());
+    }
+    
+    protected static class FailRunnable implements Runnable {
+        @Override public void run() { throw new RuntimeException("Planned exception for test"); }
+    }
+    protected static class FailCallable implements Callable<String> {
+        @Override public String call() { throw new RuntimeException("Planned exception for test"); }
+    }
+    
+    @Test
+    public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() {
+        Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .add(t1).add(monitorableTask("2")).build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseMonitorableJob("main");
+        
+        Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+        Assert.assertEquals(messages, MutableList.of("1", "main"));
+        Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+        Assert.assertTrue(t.isError());
+        Assert.assertTrue(t1.isError());
+    }
+
+    @Test
+    public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
+        Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseAndWaitForMonitorableJob("2");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseMonitorableJob("main");
+        Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+        Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+        Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+        Assert.assertFalse(t.isError());
+        Assert.assertTrue(t1.isError());
+    }
+
+    @Test
+    public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
+        Task<String> t1 = monitorableTask(null, "1", new FailCallable());
+        TaskTags.markInessential(t1);
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .add(t1).add(monitorableTask("2")).build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseAndWaitForMonitorableJob("2");
+        Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
+        releaseMonitorableJob("main");
+        Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
+        Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+        Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
+        Assert.assertFalse(t.isError());
+        Assert.assertTrue(t1.isError());
+    }
+
+    @Test
+    public void testTaskBuilderUsingAddVarargChildren() {
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .add(monitorableTask("1"), monitorableTask("2"))
+            .build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        releaseAndWaitForMonitorableJob("2");
+        releaseAndWaitForMonitorableJob("main");
+        
+        Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+    }
+    
+    @Test
+    public void testTaskBuilderUsingAddAllChildren() {
+        Task<String> t = Tasks.<String>builder().dynamic(true)
+            .body(monitorableJob("main"))
+            .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2")))
+            .build();
+        ec.submit(t);
+        releaseAndWaitForMonitorableJob("1");
+        releaseAndWaitForMonitorableJob("2");
+        releaseAndWaitForMonitorableJob("main");
+        
+        Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
new file mode 100644
index 0000000..980a701
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ForwardingTask;
+import org.apache.brooklyn.core.util.task.TaskInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class NonBasicTaskExecutionTest {
+    private static final Logger log = LoggerFactory.getLogger(NonBasicTaskExecutionTest.class);
+ 
+    private static final int TIMEOUT_MS = 10*1000;
+    
+    public static class ConcreteForwardingTask<T> extends ForwardingTask<T> {
+        private final TaskInternal<T> delegate;
+
+        ConcreteForwardingTask(TaskInternal<T> delegate) {
+            this.delegate = delegate;
+        }
+        
+        @Override
+        protected TaskInternal<T> delegate() {
+            return delegate;
+        }
+    }
+    
+    private BasicExecutionManager em;
+    private Map<Integer,String> data;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        em = new BasicExecutionManager("mycontext");
+        data = Collections.synchronizedMap(new HashMap<Integer,String>());
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (em != null) em.shutdownNow();
+    }
+    
+    @Test
+    public void runSimpleTask() throws Exception {
+        TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
+            @Override public Object call() {
+                return data.put(1, "b");
+            }}));
+        data.put(1, "a");
+        Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+        assertEquals("a", t.get());
+        assertEquals("a", t2.get());
+        assertSame(t, t2, "t="+t+"; t2="+t2);
+        assertEquals("b", data.get(1));
+    }
+    
+    @Test
+    public void runBasicTaskWithWaits() throws Exception {
+        final CountDownLatch signalStarted = new CountDownLatch(1);
+        final CountDownLatch allowCompletion = new CountDownLatch(1);
+        final TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Object result = data.put(1, "b");
+                signalStarted.countDown();
+                assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+                return result;
+            }}));
+        data.put(1, "a");
+
+        Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+        assertEquals(t, t2);
+        assertFalse(t.isDone());
+        
+        assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        assertEquals("b", data.get(1));
+        assertFalse(t.isDone());
+        
+        log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false));
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                t.getStatusDetail(false).toLowerCase().contains("waiting");
+            }});
+        // "details="+t.getStatusDetail(false))
+        
+        allowCompletion.countDown();
+        assertEquals("a", t.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
new file mode 100644
index 0000000..3b338da
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeInterruptedException;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings({"unchecked","rawtypes"})
+public class ScheduledExecutionTest {
+
+    public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
+    
+    @Test
+    public void testScheduledTask() throws Exception {
+        int PERIOD = 20;
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final AtomicInteger i = new AtomicInteger(0);
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD, "maxIterations", 5), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(new Callable<Integer>() {
+                    public Integer call() {
+                        log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        return i.incrementAndGet();
+                    }});
+            }});
+    
+        log.info("submitting {} {}", t, t.getStatusDetail(false));
+        m.submit(t);
+        log.info("submitted {} {}", t, t.getStatusDetail(false));
+        Integer interimResult = (Integer) t.get();
+        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        assertTrue(i.get() > 0, "i="+i);
+        t.blockUntilEnded();
+        Integer finalResult = (Integer) t.get();
+        log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
+        assertEquals(finalResult, (Integer)5);
+        assertEquals(i.get(), 5);
+    }
+
+    /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
+    @Test
+    public void testScheduledTaskSelfEnding() throws Exception {
+        int PERIOD = 20;
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final AtomicInteger i = new AtomicInteger(0);
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(new Callable<Integer>() {
+                    public Integer call() {
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+                        if (i.get() >= 4) submitter.period = null;
+                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        return i.incrementAndGet();
+                    }});
+            }});
+    
+        log.info("submitting {} {}", t, t.getStatusDetail(false));
+        m.submit(t);
+        log.info("submitted {} {}", t, t.getStatusDetail(false));
+        Integer interimResult = (Integer) t.get();
+        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        assertTrue(i.get() > 0);
+        t.blockUntilEnded();
+        Integer finalResult = (Integer) t.get();
+        log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
+        assertEquals(finalResult, (Integer)5);
+        assertEquals(i.get(), 5);
+    }
+
+    @Test
+    public void testScheduledTaskCancelEnding() throws Exception {
+        Duration PERIOD = Duration.millis(20);
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final AtomicInteger i = new AtomicInteger();
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(new Callable<Integer>() {
+                    public Integer call() {
+                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
+                        i.incrementAndGet();
+                        if (i.get() >= 5) submitter.cancel();
+                        return i.get();
+                    }});
+            }});
+    
+        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+        m.submit(t);
+        log.info("submitted {} {}", t, t.getStatusDetail(false));
+        Integer interimResult = (Integer) t.get();
+        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        assertTrue(i.get() > 0);
+        t.blockUntilEnded();
+//      int finalResult = t.get()
+        log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
+//      assertEquals(finalResult, 5)
+        assertEquals(i.get(), 5);
+    }
+
+    @Test(groups="Integration")
+    public void testScheduledTaskCancelOuter() throws Exception {
+        final Duration PERIOD = Duration.millis(20);
+        final Duration CYCLE_DELAY = Duration.ONE_SECOND;
+        // this should be enough to start the next cycle, but not so much that the cycle ends;
+        // and enough that when a task is interrupted it terminates within this period
+        final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
+        
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final AtomicInteger i = new AtomicInteger();
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(new Callable<Integer>() {
+                    public Integer call() {
+                        log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                        Time.sleep(CYCLE_DELAY);
+                        i.incrementAndGet();
+                        return i.get();
+                    }});
+            }});
+    
+        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+        m.submit(t);
+        log.info("submitted {} {}", t, t.getStatusDetail(false));
+        Integer interimResult = (Integer) t.get();
+        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        assertEquals(i.get(), 1);
+        
+        Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+        assertEquals(t.get(), 2);
+        
+        Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+        Stopwatch timer = Stopwatch.createUnstarted();
+        t.cancel(true);
+        t.blockUntilEnded();
+//      int finalResult = t.get()
+        log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        try {
+            t.get();
+            Assert.fail("Should have failed getting result of cancelled "+t);
+        } catch (Exception e) {
+            /* expected */
+        }
+        assertEquals(i.get(), 2);
+        log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+    }
+
+    @Test(groups="Integration")
+    public void testScheduledTaskCancelInterrupts() throws Exception {
+        final Duration PERIOD = Duration.millis(20);
+        final Duration CYCLE_DELAY = Duration.ONE_SECOND;
+        // this should be enough to start the next cycle, but not so much that the cycle ends;
+        // and enough that when a task is interrupted it terminates within this period
+        final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
+        
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final Semaphore interruptedSemaphore = new Semaphore(0);
+        final AtomicInteger i = new AtomicInteger();
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Integer>(new Callable<Integer>() {
+                    public Integer call() {
+                        try {
+                            log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+                            Time.sleep(CYCLE_DELAY);
+                            i.incrementAndGet();
+                            return i.get();
+                        } catch (RuntimeInterruptedException e) {
+                            interruptedSemaphore.release();
+                            throw Exceptions.propagate(e);
+                        }
+                    }});
+            }});
+    
+        log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
+        m.submit(t);
+        log.info("submitted {} {}", t, t.getStatusDetail(false));
+        Integer interimResult = (Integer) t.get();
+        log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
+        assertEquals(i.get(), 1);
+        
+        Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+        assertEquals(t.get(), 2);
+        
+        Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
+        Stopwatch timer = Stopwatch.createUnstarted();
+        t.cancel(true);
+        t.blockUntilEnded();
+//      int finalResult = t.get()
+        log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        try {
+            t.get();
+            Assert.fail("Should have failed getting result of cancelled "+t);
+        } catch (Exception e) {
+            /* expected */
+        }
+        assertEquals(i.get(), 2);
+        Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted");
+        log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
+        Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
+    }
+
+    @Test(groups="Integration")
+    public void testScheduledTaskTakesLongerThanPeriod() throws Exception {
+        final int PERIOD = 1;
+        final int SLEEP_TIME = 100;
+        final int EARLY_RETURN_GRACE = 10;
+        BasicExecutionManager m = new BasicExecutionManager("mycontextid");
+        final List<Long> execTimes = new CopyOnWriteArrayList<Long>();
+        
+        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() {
+            public Task<?> call() throws Exception {
+                return new BasicTask<Void>(new Runnable() {
+                    public void run() {
+                        execTimes.add(System.currentTimeMillis());
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            throw Exceptions.propagate(e);
+                        }
+                    }});
+            }});
+    
+        m.submit(t);
+        
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertTrue(execTimes.size() > 3, "size="+execTimes.size());
+            }});
+        
+        List<Long> timeDiffs = Lists.newArrayList();
+        long prevExecTime = -1;
+        for (Long execTime : execTimes) {
+            if (prevExecTime == -1) {
+                prevExecTime = execTime;
+            } else {
+                timeDiffs.add(execTime - prevExecTime);
+                prevExecTime = execTime;
+            }
+        }
+        
+        for (Long timeDiff : timeDiffs) {
+            if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
new file mode 100644
index 0000000..e3420c8
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.SingleThreadedScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.util.concurrent.Callables;
+
+public class SingleThreadedSchedulerTest {
+
+    private static final Logger log = LoggerFactory.getLogger(SingleThreadedSchedulerTest.class);
+    
+    private BasicExecutionManager em;
+    
+    @BeforeMethod
+    public void setUp() {
+        em = new BasicExecutionManager("mycontextid");
+        em.setTaskSchedulerForTag("category1", SingleThreadedScheduler.class);
+    }
+    
+    @AfterMethod
+    public void tearDown() {
+        if (em != null) em.shutdownNow();
+    }
+    
+    @Test
+    public void testExecutesInOrder() throws Exception {
+        final int NUM_TIMES = 1000;
+        final List<Integer> result = new CopyOnWriteArrayList<Integer>();
+        for (int i = 0; i < NUM_TIMES; i++) {
+            final int counter = i;
+            em.submit(MutableMap.of("tag", "category1"), new Runnable() {
+                public void run() {
+                    result.add(counter);
+                }});
+        }
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(result.size(), NUM_TIMES);
+            }});
+
+        for (int i = 0; i < NUM_TIMES; i++) {
+            assertEquals(result.get(i), (Integer)i);
+        }        
+    }
+    
+    @Test
+    public void testLargeQueueDoesNotConsumeTooManyThreads() throws Exception {
+        final int NUM_TIMES = 3000;
+        final CountDownLatch latch = new CountDownLatch(1);
+        BasicTask<Void> blockingTask = new BasicTask<Void>(newLatchAwaiter(latch));
+        em.submit(MutableMap.of("tag", "category1"), blockingTask);
+        
+        final AtomicInteger counter = new AtomicInteger(0);
+        for (int i = 0; i < NUM_TIMES; i++) {
+            BasicTask<Void> t = new BasicTask<Void>(new Runnable() {
+                public void run() {
+                    counter.incrementAndGet();
+                }});
+            em.submit(MutableMap.of("tag", "category1"), t);
+            if (i % 500 == 0) log.info("Submitted "+i+" jobs...");
+        }
+
+        Thread.sleep(100); // give it more of a chance to create the threads before we let them execute
+        latch.countDown();
+
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(counter.get(), NUM_TIMES);
+            }});
+    }
+    
+    @Test
+    public void testGetResultOfQueuedTaskBeforeItExecutes() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+        Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                latch.countDown();
+            }});
+        thread.start();
+        assertEquals(future.get(), (Integer)123);
+    }
+    
+    @Test
+    public void testGetResultOfQueuedTaskBeforeItExecutesWithTimeout() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+        Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+        try {
+            assertEquals(future.get(10, TimeUnit.MILLISECONDS), (Integer)123);
+            fail();
+        } catch (TimeoutException e) {
+            // success
+        }
+    }
+    
+    @Test
+    public void testCancelQueuedTaskBeforeItExecutes() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+        
+        final AtomicBoolean executed = new AtomicBoolean();
+        BasicTask<?> t = new BasicTask<Void>(new Runnable() {
+            public void run() {
+                executed.set(true);
+            }});
+        Future<?> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+        future.cancel(true);
+        latch.countDown();
+        Thread.sleep(10);
+        try {
+            future.get();
+        } catch (CancellationException e) {
+            // success
+        }
+        assertFalse(executed.get());
+    }
+    
+    @Test
+    public void testGetResultOfQueuedTaskAfterItExecutes() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
+        
+        BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
+        Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
+
+        latch.countDown();
+        assertEquals(future.get(), (Integer)123);
+    }
+    
+    private Callable<Void> newLatchAwaiter(final CountDownLatch latch) {
+        return new Callable<Void>() {
+            public Void call() throws Exception {
+                latch.await();
+                return null;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
new file mode 100644
index 0000000..1ff181b
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+
+public class TaskFinalizationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(TaskFinalizationTest.class);
+    
+    // integration because it can take a while (and finalizers aren't even guaranteed)
+    @Test(groups="Integration")
+    public void testFinalizerInvoked() throws InterruptedException {
+        BasicTask<?> t = new BasicTask<Void>(new Runnable() { public void run() { /* no op */ }});
+        final Semaphore x = new Semaphore(0);
+        t.setFinalizer(new BasicTask.TaskFinalizer() {
+            public void onTaskFinalization(Task<?> t) {
+                synchronized (x) { 
+                    x.release();
+                }
+            }
+        });
+        t = null;
+        Stopwatch watch = Stopwatch.createStarted();
+        for (int i=0; i<30; i++) {
+            System.gc(); System.gc();
+            if (x.tryAcquire(1, TimeUnit.SECONDS)) {
+                log.info("finalizer ran after "+Time.makeTimeStringRounded(watch));
+                return;
+            }
+        }
+        Assert.fail("finalizer did not run in time");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
new file mode 100644
index 0000000..0800984
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.TaskInternal;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.ValueResolver;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.basic.EntityFunctions;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Callables;
+
+
+public class TasksTest extends BrooklynAppUnitTestSupport {
+
+    private ExecutionContext executionContext;
+
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        executionContext = app.getExecutionContext();
+    }
+    
+    @Test
+    public void testResolveNull() throws Exception {
+        assertResolvesValue(null, String.class, null);
+    }
+    
+    @Test
+    public void testResolveValueCastsToType() throws Exception {
+        assertResolvesValue(123, String.class, "123");
+    }
+    
+    @Test
+    public void testResolvesAttributeWhenReady() throws Exception {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+        assertResolvesValue(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE), String.class, "myval");
+    }
+    
+    @Test
+    public void testResolvesMapWithAttributeWhenReady() throws Exception {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+        Map<?,?> orig = ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
+        Map<?,?> expected = ImmutableMap.of("mykey", "myval");
+        assertResolvesValue(orig, String.class, expected);
+    }
+    
+    @Test
+    public void testResolvesSetWithAttributeWhenReady() throws Exception {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+        Set<?> orig = ImmutableSet.of(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
+        Set<?> expected = ImmutableSet.of("myval");
+        assertResolvesValue(orig, String.class, expected);
+    }
+    
+    @Test
+    public void testResolvesMapOfMapsWithAttributeWhenReady() throws Exception {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+        Map<?,?> orig = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)));
+        Map<?,?> expected = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", "myval"));
+        assertResolvesValue(orig, String.class, expected);
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testResolvesIterableOfMapsWithAttributeWhenReady() throws Exception {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+        // using Iterables.concat so that orig is of type FluentIterable rather than List etc
+        Iterable<?> orig = Iterables.concat(ImmutableList.of(ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))));
+        Iterable<Map<?,?>> expected = ImmutableList.<Map<?,?>>of(ImmutableMap.of("mykey", "myval"));
+        assertResolvesValue(orig, String.class, expected);
+    }
+    
+    private void assertResolvesValue(Object actual, Class<?> type, Object expected) throws Exception {
+        Object result = Tasks.resolveValue(actual, type, executionContext);
+        assertEquals(result, expected);
+    }
+    
+    @Test
+    public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception {
+        app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO));
+        Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build();
+        ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext());
+        
+        ValueResolverTest.assertThrowsOnMaybe(v);
+        ValueResolverTest.assertThrowsOnGet(v);
+        
+        v.swallowExceptions();
+        ValueResolverTest.assertMaybeIsAbsent(v);
+        ValueResolverTest.assertThrowsOnGet(v);
+        
+        v.defaultValue("foo");
+        ValueResolverTest.assertMaybeIsAbsent(v);
+        assertEquals(v.clone().get(), "foo");
+        assertResolvesValue(v, Object.class, "foo");
+    }
+
+    @Test
+    public void testRepeater() throws Exception {
+        Task<?> t;
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        t.get(Duration.TEN_SECONDS);
+        
+        t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.TEN_SECONDS), true);
+        
+        t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        try {
+            t.get(Duration.TEN_SECONDS);
+            Assert.fail("Should have failed");
+        } catch (Exception e) {
+            // expected
+        }
+
+        t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
+        app.getExecutionContext().submit(t);
+        Assert.assertEquals(t.get(Duration.TEN_SECONDS), false);
+    }
+
+    @Test
+    public void testRepeaterDescription() throws Exception{
+        final String description = "task description";
+        Repeater repeater = Repeater.create(description)
+            .repeat(Callables.returning(null))
+            .every(Duration.ONE_MILLISECOND)
+            .limitIterationsTo(1)
+            .until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() {
+                    TaskInternal<?> current = (TaskInternal<?>)Tasks.current();
+                    assertEquals(current.getBlockingDetails(), description);
+                    return true;
+                }
+            });
+        Task<Boolean> t = Tasks.testing(repeater).build();
+        app.getExecutionContext().submit(t);
+        assertTrue(t.get(Duration.TEN_SECONDS));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
new file mode 100644
index 0000000..9f65bc4
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.apache.brooklyn.core.util.task.ValueResolver;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+/**
+ * see also {@link TasksTest} for more tests
+ */
+@Test
+public class ValueResolverTest extends BrooklynAppUnitTestSupport {
+
+    private ExecutionContext executionContext;
+
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        executionContext = app.getExecutionContext();
+    }
+    
+    public static final Task<String> newSleepTask(final Duration timeout, final String result) {
+        return Tasks.<String>builder().body(new Callable<String>() { 
+            public String call() { 
+                Time.sleep(timeout); 
+                return result; 
+            }}
+        ).build();
+    }
+    
+    public static final Task<String> newThrowTask(final Duration timeout) {
+        return Tasks.<String>builder().body(new Callable<String>() { 
+            public String call() {
+                Time.sleep(timeout); 
+                throw new IllegalStateException("intended, during tests");
+            }}
+        ).build();
+    }
+    
+    public void testTimeoutZero() {
+        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe();
+        Assert.assertFalse(result.isPresent());
+    }
+    
+    public void testTimeoutBig() {
+        Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(executionContext).timeout(Duration.TEN_SECONDS).getMaybe();
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testNoExecutionContextOnCompleted() {
+        Task<String> t = newSleepTask(Duration.ZERO, "foo");
+        executionContext.submit(t).getUnchecked();
+        Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) {
+        try {
+            result = result.clone();
+            result.getMaybe();
+            Assert.fail("should have thrown");
+            return null;
+        } catch (Exception e) { return e; }
+    }
+    public static Throwable assertThrowsOnGet(ValueResolver<?> result) {
+        result = result.clone();
+        try {
+            result.get();
+            Assert.fail("should have thrown");
+            return null;
+        } catch (Exception e) { return e; }
+    }
+    public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) {
+        result = result.clone();
+        Maybe<T> maybe = result.getMaybe();
+        Assert.assertFalse(maybe.isPresent());
+        return maybe;
+    }
+    
+    public void testSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions();
+        assertMaybeIsAbsent(result);
+        assertThrowsOnGet(result);
+    }
+
+
+    public void testDontSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext);
+        assertThrowsOnMaybe(result);
+        assertThrowsOnGet(result);
+    }
+
+    public void testDefaultWhenSwallowError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo");
+        assertMaybeIsAbsent(result);
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+    public void testDefaultBeforeDelayAndError() {
+        ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo");
+        assertMaybeIsAbsent(result);
+        Assert.assertEquals(result.get(), "foo");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
new file mode 100644
index 0000000..94fe3e6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.ssh;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.management.internal.LocalManagementContext;
+import org.apache.brooklyn.core.util.ssh.BashCommandsIntegrationTest;
+import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskFactory;
+import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskWrapper;
+import org.apache.brooklyn.core.util.task.ssh.SshPutTaskFactory;
+import org.apache.brooklyn.core.util.task.ssh.SshPutTaskWrapper;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.ssh.SshTasksTest;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.Entities;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.net.Urls;
+import brooklyn.util.os.Os;
+
+/**
+ * Some tests for {@link SshTasks}. Note more tests in {@link BashCommandsIntegrationTest}, 
+ * {@link SshEffectorTasksTest}, and {@link SoftwareEffectorTest}.
+ */
+public class SshTasksTest {
+
+    private static final Logger log = LoggerFactory.getLogger(SshTasksTest.class);
+    
+    ManagementContext mgmt;
+    SshMachineLocation host;
+    File tempDir;
+    
+    boolean failureExpected;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setup() throws Exception {
+        mgmt = new LocalManagementContext();
+        
+        LocalhostMachineProvisioningLocation lhc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
+        host = lhc.obtain();
+        clearExpectedFailure();
+        tempDir = Os.newTempDir(getClass());
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (mgmt != null) Entities.destroyAll(mgmt);
+        mgmt = null;
+        tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing();
+        checkExpectedFailure();
+    }
+
+    protected void checkExpectedFailure() {
+        if (failureExpected) {
+            clearExpectedFailure();
+            Assert.fail("Test should have thrown an exception but it did not.");
+        }
+    }
+    
+    protected void clearExpectedFailure() {
+        failureExpected = false;
+    }
+
+    protected void setExpectingFailure() {
+        failureExpected = true;
+    }
+
+
+    protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) {
+        tf.machine(host);
+        ProcessTaskWrapper<T> t = tf.newTask();
+        mgmt.getExecutionManager().submit(t);
+        return t;
+    }
+
+    protected SshPutTaskWrapper submit(final SshPutTaskFactory tf) {
+        SshPutTaskWrapper t = tf.newTask();
+        mgmt.getExecutionManager().submit(t);
+        return t;
+    }
+
+    @Test(groups="Integration")
+    public void testSshEchoHello() {
+        ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "sleep 1 ; echo hello world"));
+        Assert.assertFalse(t.isDone());
+        Assert.assertEquals(t.get(), (Integer)0);
+        Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0);
+        Assert.assertEquals(t.getStdout().trim(), "hello world");
+    }
+
+    @Test(groups="Integration")
+    public void testCopyTo() throws IOException {
+        String fn = Urls.mergePaths(tempDir.getPath(), "f1");
+        SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
+        t.block();
+        Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
+        // and make sure this doesn't throw
+        Assert.assertTrue(t.isDone());
+        Assert.assertTrue(t.isSuccessful());
+        Assert.assertEquals(t.get(), null);
+        Assert.assertEquals(t.getExitCode(), (Integer)0);
+    }
+    
+    @Test(groups="Integration")
+    public void testCopyToFailBadSubdir() throws IOException {
+        String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
+        SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
+        //this doesn't fail
+        t.block();        
+        Assert.assertTrue(t.isDone());
+        setExpectingFailure();
+        try {
+            // but this does
+            t.get();
+        } catch (Exception e) {
+            log.info("The error if file cannot be written is: "+e);
+            clearExpectedFailure();
+        }
+        checkExpectedFailure();
+        // and the results indicate failure
+        Assert.assertFalse(t.isSuccessful());
+        Assert.assertNotNull(t.getException());
+        Assert.assertNotEquals(t.getExitCode(), (Integer)0);
+    }
+
+    @Test(groups="Integration")
+    public void testCopyToFailBadSubdirAllow() throws IOException {
+        String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
+        SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").allowFailure());
+        //this doesn't fail
+        t.block();        
+        Assert.assertTrue(t.isDone());
+        // and this doesn't fail either
+        Assert.assertEquals(t.get(), null);
+        // but it's not successful
+        Assert.assertNotNull(t.getException());
+        Assert.assertFalse(t.isSuccessful());
+        // exit code probably null, but won't be zero
+        Assert.assertNotEquals(t.getExitCode(), (Integer)0);
+    }
+
+    @Test(groups="Integration")
+    public void testCopyToFailBadSubdirCreate() throws IOException {
+        String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir-to-create/file");
+        SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").createDirectory());
+        t.block();
+        // directory should be created, and file readable now
+        Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
+        Assert.assertEquals(t.getExitCode(), (Integer)0);
+    }
+
+    @Test(groups="Integration")
+    public void testSshFetch() throws IOException {
+        String fn = Urls.mergePaths(tempDir.getPath(), "f2");
+        FileUtils.write(new File(fn), "hello fetched world");
+        
+        SshFetchTaskFactory tf = SshTasks.newSshFetchTaskFactory(host, fn);
+        SshFetchTaskWrapper t = tf.newTask();
+        mgmt.getExecutionManager().submit(t);
+
+        t.block();
+        Assert.assertTrue(t.isDone());
+        Assert.assertEquals(t.get(), "hello fetched world");
+        Assert.assertEquals(t.getBytes(), "hello fetched world".getBytes());
+    }
+
+    @Test(groups="Integration")
+    public void testSshWithHeaderProperty() {
+        host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
+        ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "echo bar"));
+        Assert.assertTrue(t.block().getStdout().trim().matches("foo\\s+bar"), "mismatched output was: "+t.getStdout());
+    }
+
+    @Test(groups="Integration")
+    public void testSshIgnoringHeaderProperty() {
+        host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
+        ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, false, "echo bar"));
+        Assert.assertTrue(t.block().getStdout().trim().matches("bar"), "mismatched output was: "+t.getStdout());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
new file mode 100644
index 0000000..b673056
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task.system;
+
+import java.io.File;
+
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.management.internal.LocalManagementContext;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.core.util.task.system.SystemTasks;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.util.os.Os;
+
+/**
+ * Some tests for {@link SystemTasks}. See {@link SshTasks}.
+ */
+public class SystemTasksTest {
+
+    ManagementContext mgmt;
+    File tempDir;
+    
+    boolean failureExpected;
+
+    @BeforeMethod(alwaysRun=true)
+    public void setup() throws Exception {
+        mgmt = new LocalManagementContext();
+        
+        clearExpectedFailure();
+        tempDir = Os.newTempDir(getClass());
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (mgmt != null) Entities.destroyAll(mgmt);
+        mgmt = null;
+        tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing();
+        checkExpectedFailure();
+    }
+
+    protected void checkExpectedFailure() {
+        if (failureExpected) {
+            clearExpectedFailure();
+            Assert.fail("Test should have thrown an exception but it did not.");
+        }
+    }
+    
+    protected void clearExpectedFailure() {
+        failureExpected = false;
+    }
+
+    protected void setExpectingFailure() {
+        failureExpected = true;
+    }
+
+
+    protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) {
+        ProcessTaskWrapper<T> t = tf.newTask();
+        mgmt.getExecutionManager().submit(t);
+        return t;
+    }
+
+    @Test(groups="Integration")
+    public void testExecEchoHello() {
+        ProcessTaskWrapper<Integer> t = submit(SystemTasks.exec("sleep 1 ; echo hello world"));
+        Assert.assertFalse(t.isDone());
+        Assert.assertEquals(t.get(), (Integer)0);
+        Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0);
+        Assert.assertEquals(t.getStdout().trim(), "hello world");
+    }
+
+    // FIXME Behaviour of Bash shell changes from 3.x to 4.x so test is disabled
+    @Test(groups="Integration", enabled=false)
+    public void testSubshellExitScriptDoesNotExit() {
+        checkSubshellExitDoesNotExit(taskSubshellExit().runAsScript());
+    }
+
+    @Test(groups="Integration")
+    public void testSubshellExitCommandDoesNotExit() {
+        checkSubshellExitDoesNotExit(taskSubshellExit().runAsCommand());
+    }
+
+    public ProcessTaskFactory<Integer> taskSubshellExit() {
+        return SystemTasks.exec("echo hello", "( exit 1 )", "echo bye code $?");
+    }
+
+    public void checkSubshellExitDoesNotExit(ProcessTaskFactory<Integer> task) {
+        ProcessTaskWrapper<Integer> t = submit(task);
+        t.block();
+        Assert.assertEquals(t.get(), (Integer)0);
+        Assert.assertTrue(t.getStdout().contains("bye code 1"), "stdout is: "+t.getStdout());
+    }
+
+    @Test(groups="Integration")
+    public void testGroupExitScriptDoesNotExit() {
+        checkGroupExitDoesExit(taskGroupExit().runAsScript());
+    }
+
+    @Test(groups="Integration")
+    public void testGroupExitCommandDoesNotExit() {
+        checkGroupExitDoesExit(taskGroupExit().runAsCommand());
+    }
+
+    public ProcessTaskFactory<Integer> taskGroupExit() {
+        return SystemTasks.exec("echo hello", "{ exit 1 ; }", "echo bye code $?");
+    }
+
+    public void checkGroupExitDoesExit(ProcessTaskFactory<Integer> task) {
+        ProcessTaskWrapper<Integer> t = submit(task);
+        t.block();
+        Assert.assertEquals(t.get(), (Integer)1);
+        Assert.assertFalse(t.getStdout().contains("bye"), "stdout is: "+t.getStdout());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
new file mode 100644
index 0000000..73794a3
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.text;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import org.apache.brooklyn.core.util.text.DataUriSchemeParser;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class DataUriSchemeParserTest {
+
+    @Test
+    public void testSimple() {
+        Assert.assertEquals(new DataUriSchemeParser("data:,hello").parse().getDataAsString(), "hello");
+        Assert.assertEquals(DataUriSchemeParser.toString("data:,hello"), "hello");
+    }
+
+    @Test
+    public void testMimeType() throws UnsupportedEncodingException {
+        DataUriSchemeParser p = new DataUriSchemeParser("data:application/json,"+URLEncoder.encode("{ }", "US-ASCII")).parse();
+        Assert.assertEquals(p.getMimeType(), "application/json");
+        Assert.assertEquals(p.getData(), "{ }".getBytes());
+    }
+
+    @Test
+    public void testBase64() {
+        Assert.assertEquals(DataUriSchemeParser.toString(
+                "data:;base64,"+new String(Base64.encode("hello".getBytes()))), 
+            "hello");
+    }
+
+    // TODO test pictures, etc
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
new file mode 100644
index 0000000..05f4fde
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.text;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.util.text.TemplateProcessor;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.event.basic.DependentConfiguration;
+import brooklyn.test.FixedLocaleTest;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TemplateProcessorTest extends BrooklynAppUnitTestSupport {
+    private FixedLocaleTest localeFix = new FixedLocaleTest();
+
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        super.setUp();
+        localeFix.setUp();
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        super.tearDown();
+        localeFix.tearDown();
+    }
+
+    @Test
+    public void testAdditionalArgs() {
+        String templateContents = "${mykey}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.of("mykey", "myval"));
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testEntityConfig() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .configure(TestEntity.CONF_NAME, "myval"));
+        String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testEntityConfigNumber() {
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .configure(TestEntity.CONF_OBJECT, 123456));
+        String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+        assertEquals(result, "123,456");
+    }
+    
+    @Test
+    public void testEntityConfigNumberUnadorned() {
+        // ?c is needed to avoid commas (i always forget this!)
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .configure(TestEntity.CONF_OBJECT, 123456));
+        String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']?c}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+        assertEquals(result, "123456");
+    }
+    
+    @Test
+    public void testGetSysProp() {
+        System.setProperty("testGetSysProp", "myval");
+        
+        String templateContents = "${javaSysProps['testGetSysProp']}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testEntityGetterMethod() {
+        String templateContents = "${entity.id}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, app.getId());
+    }
+    
+    @Test
+    public void testManagementContextConfig() {
+        mgmt.getBrooklynProperties().put("globalmykey", "myval");
+        String templateContents = "${mgmt.globalmykey}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testManagementContextDefaultValue() {
+        String templateContents = "${(missing)!\"defval\"}";
+        Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, "defval");
+    }
+    
+    @Test
+    public void testManagementContextDefaultValueInDotMissingValue() {
+        String templateContents = "${(mgmt.missing.more_missing)!\"defval\"}";
+        Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, "defval");
+    }
+    
+    @Test
+    public void testManagementContextConfigWithDot() {
+        mgmt.getBrooklynProperties().put("global.mykey", "myval");
+        String templateContents = "${mgmt['global.mykey']}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testManagementContextErrors() {
+        try {
+            // NB: dot has special meaning so this should fail; must be accessed using bracket notation as above
+            mgmt.getBrooklynProperties().put("global.mykey", "myval");
+            String templateContents = "${mgmt.global.mykey}";
+            TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of());
+            Assert.fail("Should not have found value with intermediate dot");
+        } catch (Exception e) {
+            Assert.assertTrue(e.toString().contains("global"), "Should have mentioned missing key 'global' in error");
+        }
+    }
+    
+    @Test
+    public void testApplyTemplatedConfigWithAttributeWhenReady() {
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
+
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)
+                .configure(TestEntity.CONF_NAME, DependentConfiguration.attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)));
+        
+        String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of());
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testDotSeparatedKey() {
+        String templateContents = "${a.b}";
+        String result = TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null, 
+            ImmutableMap.<String,Object>of("a.b", "myval"));
+        assertEquals(result, "myval");
+    }
+    
+    @Test
+    public void testDotSeparatedKeyCollisionFailure() {
+        String templateContents = "${aaa.bbb}";
+        try {
+            TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null, 
+                ImmutableMap.<String,Object>of("aaa.bbb", "myval", "aaa", "blocker"));
+            Assert.fail("Should not have found value with intermediate dot where prefix is overridden");
+        } catch (Exception e) {
+            Assert.assertTrue(e.toString().contains("aaa"), "Should have mentioned missing key 'aaa' in error");
+        }
+    }
+
+}