You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:17:46 UTC
[15/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java b/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java
deleted file mode 100644
index f1c1332..0000000
--- a/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Stopwatch;
-
-public class BasicTasksFutureTest {
-
- private static final Logger log = LoggerFactory.getLogger(BasicTasksFutureTest.class);
-
- private BasicExecutionManager em;
- private BasicExecutionContext ec;
- private Map<Object,Object> data;
- private ExecutorService ex;
- private Semaphore started;
- private Semaphore waitInTask;
- private Semaphore cancelledWhileSleeping;
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() {
- em = new BasicExecutionManager("mycontext");
- ec = new BasicExecutionContext(em);
- ex = Executors.newCachedThreadPool();
- data = Collections.synchronizedMap(new LinkedHashMap<Object,Object>());
- started = new Semaphore(0);
- waitInTask = new Semaphore(0);
- cancelledWhileSleeping = new Semaphore(0);
- }
-
- @AfterMethod(alwaysRun=true)
- public void tearDown() throws Exception {
- if (em != null) em.shutdownNow();
- if (ex != null) ex.shutdownNow();
- }
-
- @Test
- public void testBlockAndGetWithTimeoutsAndListenableFuture() throws InterruptedException {
- Task<String> t = waitForSemaphore(Duration.FIVE_SECONDS, true, "x");
-
- Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
- Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
- boolean didNotThrow = false;
-
- try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
- catch (Exception e) { /* expected */ }
- Assert.assertFalse(didNotThrow);
-
- try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
- catch (Exception e) { /* expected */ }
- Assert.assertFalse(didNotThrow);
-
- addFutureListener(t, "before");
- ec.submit(t);
-
- Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
- Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
-
- try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
- catch (Exception e) { /* expected */ }
- Assert.assertFalse(didNotThrow);
-
- try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
- catch (Exception e) { /* expected */ }
- Assert.assertFalse(didNotThrow);
-
- addFutureListener(t, "during");
-
- synchronized (data) {
- // now let it finish
- waitInTask.release();
- Assert.assertTrue(t.blockUntilEnded(Duration.TEN_SECONDS));
-
- Assert.assertEquals(t.getUnchecked(Duration.millis(1)), "x");
- Assert.assertEquals(t.getUnchecked(Duration.ZERO), "x");
-
- Assert.assertNull(data.get("before"));
- Assert.assertNull(data.get("during"));
- // can't set the data(above) until we release the lock (in assert call below)
- assertSoonGetsData("before");
- assertSoonGetsData("during");
- }
-
- // and see that a listener added late also runs
- synchronized (data) {
- addFutureListener(t, "after");
- Assert.assertNull(data.get("after"));
- assertSoonGetsData("after");
- }
- }
-
- private void addFutureListener(Task<String> t, final String key) {
- t.addListener(new Runnable() { public void run() {
- synchronized (data) {
- log.info("notifying for "+key);
- data.notifyAll();
- data.put(key, true);
- }
- }}, ex);
- }
-
- private void assertSoonGetsData(String key) throws InterruptedException {
- for (int i=0; i<10; i++) {
- if (Boolean.TRUE.equals(data.get(key))) {
- log.info("got data for "+key);
- return;
- }
- data.wait(Duration.ONE_SECOND.toMilliseconds());
- }
- Assert.fail("did not get data for '"+key+"' in time");
- }
-
- private <T> Task<T> waitForSemaphore(final Duration time, final boolean requireSemaphore, final T result) {
- return Tasks.<T>builder().body(new Callable<T>() {
- public T call() {
- try {
- started.release();
- log.info("waiting up to "+time+" to acquire before returning "+result);
- if (!waitInTask.tryAcquire(time.toMilliseconds(), TimeUnit.MILLISECONDS)) {
- log.info("did not get semaphore");
- if (requireSemaphore) Assert.fail("task did not get semaphore");
- } else {
- log.info("got semaphore");
- }
- } catch (Exception e) {
- log.info("cancelled before returning "+result);
- cancelledWhileSleeping.release();
- throw Exceptions.propagate(e);
- }
- log.info("task returning "+result);
- return result;
- }
- }).build();
- }
-
- @Test
- public void testCancelAfterStartTriggersListenableFuture() throws Exception {
- doTestCancelTriggersListenableFuture(Duration.millis(50));
- }
- @Test
- public void testCancelImmediateTriggersListenableFuture() throws Exception {
- // if cancel fires after submit but before it passes to the executor,
- // that needs handling separately; this doesn't guarantee this code path,
- // but it happens sometimes (and it should be handled)
- doTestCancelTriggersListenableFuture(Duration.ZERO);
- }
- public void doTestCancelTriggersListenableFuture(Duration delay) throws Exception {
- Task<String> t = waitForSemaphore(Duration.TEN_SECONDS, true, "x");
- addFutureListener(t, "before");
-
- Stopwatch watch = Stopwatch.createStarted();
- ec.submit(t);
-
- addFutureListener(t, "during");
-
- log.info("test cancelling "+t+" ("+t.getClass()+") after "+delay);
- // NB: two different code paths (callers to this method) for notifying futures
- // depending whether task is started
- Time.sleep(delay);
-
- synchronized (data) {
- t.cancel(true);
-
- assertSoonGetsData("before");
- assertSoonGetsData("during");
-
- addFutureListener(t, "after");
- Assert.assertNull(data.get("after"));
- assertSoonGetsData("after");
- }
-
- Assert.assertTrue(t.isDone());
- Assert.assertTrue(t.isCancelled());
- try {
- t.get();
- Assert.fail("should have thrown CancellationException");
- } catch (CancellationException e) { /* expected */ }
-
- Assert.assertTrue(watch.elapsed(TimeUnit.MILLISECONDS) < Duration.FIVE_SECONDS.toMilliseconds(),
- Time.makeTimeStringRounded(watch.elapsed(TimeUnit.MILLISECONDS))+" is too long; should have cancelled very quickly");
-
- if (started.tryAcquire())
- // if the task is begun, this should get released
- Assert.assertTrue(cancelledWhileSleeping.tryAcquire(5, TimeUnit.SECONDS));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java
deleted file mode 100644
index 9fe4ba0..0000000
--- a/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import org.testng.collections.Lists;
-
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * Test the operation of the {@link CompoundTask} class.
- */
-public class CompoundTaskExecutionTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(CompoundTaskExecutionTest.class);
-
- BasicExecutionManager em;
- BasicExecutionContext ec;
-
- @BeforeClass
- public void setup() {
- em = new BasicExecutionManager("mycontext");
- ec = new BasicExecutionContext(em);
- }
-
- @AfterClass
- public void teardown() {
- if (em != null) em.shutdownNow();
- em = null;
- }
-
- private BasicTask<String> taskReturning(final String val) {
- return new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- return val;
- }
- });
- }
-
- private BasicTask<String> slowTaskReturning(final String val, final Duration pauseTime) {
- return new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- Time.sleep(pauseTime);
- return val;
- }
- });
- }
-
-
- @Test
- public void runSequenceTask() throws Exception {
- BasicTask<String> t1 = taskReturning("a");
- BasicTask<String> t2 = taskReturning("b");
- BasicTask<String> t3 = taskReturning("c");
- BasicTask<String> t4 = taskReturning("d");
- Task<List<String>> tSequence = ec.submit(new SequentialTask<String>(t1, t2, t3, t4));
- assertEquals(tSequence.get(), ImmutableList.of("a", "b", "c", "d"));
- }
-
- @Test
- public void testSequentialTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
- BasicTask<String> t1 = taskReturning("a");
- BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
- @Override public String call() throws Exception {
- throw new IllegalArgumentException("forced exception");
- }
- });
- BasicTask<String> t3 = taskReturning("c");
- SequentialTask<String> task = new SequentialTask<String>(t1, t2, t3);
- Task<List<String>> tSequence = ec.submit(task);
-
- try {
- tSequence.get();
- fail("t2 should have thrown an exception");
- } catch (Exception e) {}
-
- assertTrue(task.isDone());
- assertTrue(task.isError());
- assertTrue(t1.isDone());
- assertFalse(t1.isError());
- assertTrue(t2.isDone());
- assertTrue(t2.isError());
- // t3 not run because of t2 exception
- assertFalse(t3.isDone());
- assertFalse(t3.isBegun());
- }
-
- @Test
- public void testParallelTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
- // differs from test above of SequentialTask in that expect t3 to be executed,
- // despite t2 failing.
- // TODO Do we expect tSequence.get() to block for everything to either fail or complete,
- // and then to throw exception? Currently it does *not* do that so test was previously failing.
-
- BasicTask<String> t1 = taskReturning("a");
- BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
- @Override public String call() throws Exception {
- throw new IllegalArgumentException("forced exception");
- }
- });
- BasicTask<String> t3 = slowTaskReturning("c", Duration.millis(100));
- ParallelTask<String> task = new ParallelTask<String>(t1, t2, t3);
- Task<List<String>> tSequence = ec.submit(task);
-
- try {
- tSequence.get();
- fail("t2 should have thrown an exception");
- } catch (Exception e) {}
-
- assertTrue(task.isDone());
- assertTrue(task.isError());
- assertTrue(t1.isDone());
- assertFalse(t1.isError());
- assertTrue(t2.isDone());
- assertTrue(t2.isError());
- assertTrue(t3.isBegun());
- assertTrue(t3.isDone());
- assertFalse(t3.isError());
- }
-
- @Test
- public void runParallelTask() throws Exception {
- BasicTask<String> t1 = taskReturning("a");
- BasicTask<String> t2 = taskReturning("b");
- BasicTask<String> t3 = taskReturning("c");
- BasicTask<String> t4 = taskReturning("d");
- Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
- assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
- }
-
- @Test
- public void runParallelTaskWithDelay() throws Exception {
- final Semaphore locker = new Semaphore(0);
- BasicTask<String> t1 = new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- try {
- locker.acquire();
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- return "a";
- }
- });
- BasicTask<String> t2 = taskReturning("b");
- BasicTask<String> t3 = taskReturning("c");
- BasicTask<String> t4 = taskReturning("d");
- final Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
-
- assertEquals(ImmutableSet.of(t2.get(), t3.get(), t4.get()), ImmutableSet.of("b", "c", "d"));
- assertFalse(t1.isDone());
- assertFalse(tSequence.isDone());
-
- // get blocks until tasks have completed
- Thread t = new Thread() {
- @Override public void run() {
- try {
- tSequence.get();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- locker.release();
- }
- };
- t.start();
- Thread.sleep(30);
- assertTrue(t.isAlive());
-
- locker.release();
-
- assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
- assertTrue(t1.isDone());
- assertTrue(tSequence.isDone());
-
- locker.acquire();
- }
-
- @Test
- public void testComplexOrdering() throws Exception {
- List<String> data = new CopyOnWriteArrayList<String>();
- SequentialTask<String> taskA = new SequentialTask<String>(
- appendAfterDelay(data, "a1"), appendAfterDelay(data, "a2"), appendAfterDelay(data, "a3"), appendAfterDelay(data, "a4"));
- SequentialTask<String> taskB = new SequentialTask<String>(
- appendAfterDelay(data, "b1"), appendAfterDelay(data, "b2"), appendAfterDelay(data, "b3"), appendAfterDelay(data, "b4"));
- Task<List<String>> t = ec.submit(new ParallelTask<String>(taskA, taskB));
- t.get();
-
- LOG.debug("Tasks happened in order: {}", data);
- assertEquals(data.size(), 8);
- assertEquals(new HashSet<String>(data), ImmutableSet.of("a1", "a2", "a3", "a4", "b1", "b2", "b3", "b4"));
-
- // a1, ..., a4 should be in order
- List<String> as = Lists.newArrayList(), bs = Lists.newArrayList();
- for (String value : data) {
- ((value.charAt(0) == 'a') ? as : bs).add(value);
- }
- assertEquals(as, ImmutableList.of("a1", "a2", "a3", "a4"));
- assertEquals(bs, ImmutableList.of("b1", "b2", "b3", "b4"));
- }
-
- private BasicTask<String> appendAfterDelay(final List<String> list, final String value) {
- return new BasicTask<String>(new Callable<String>() {
- @Override public String call() {
- try {
- Thread.sleep((int) (100 * Math.random()));
- } catch (InterruptedException e) {
- throw Throwables.propagate(e);
- }
- LOG.debug("running {}", value);
- list.add(value);
- return value;
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java b/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java
deleted file mode 100644
index a5985fe..0000000
--- a/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.test.Asserts;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.CountdownTimer;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class DynamicSequentialTaskTest {
-
- private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class);
-
- public static final Duration TIMEOUT = Duration.TEN_SECONDS;
- public static final Duration TINY_TIME = Duration.millis(20);
-
- BasicExecutionManager em;
- BasicExecutionContext ec;
- List<String> messages;
- Semaphore cancellations;
- Stopwatch stopwatch;
- Map<String,Semaphore> monitorableJobSemaphoreMap;
- Map<String,Task<String>> monitorableTasksMap;
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() {
- em = new BasicExecutionManager("mycontext");
- ec = new BasicExecutionContext(em);
- cancellations = new Semaphore(0);
- messages = new ArrayList<String>();
- monitorableJobSemaphoreMap = MutableMap.of();
- monitorableTasksMap = MutableMap.of();
- monitorableTasksMap.clear();
- stopwatch = Stopwatch.createStarted();
- }
-
- @AfterMethod(alwaysRun=true)
- public void tearDown() throws Exception {
- if (em != null) em.shutdownNow();
- }
-
- @Test
- public void testSimple() throws InterruptedException, ExecutionException {
- Callable<String> mainJob = new Callable<String>() {
- public String call() {
- log.info("main job - "+Tasks.current());
- messages.add("main");
- DynamicTasks.queue( sayTask("world") );
- return "bye";
- }
- };
- DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob);
- // this should be added before anything added when the task is invoked
- t.queue(sayTask("hello"));
-
- Assert.assertEquals(messages, Lists.newArrayList());
- Assert.assertEquals(t.isBegun(), false);
- Assert.assertEquals(Iterables.size(t.getChildren()), 1);
-
- ec.submit(t);
- Assert.assertEquals(t.isSubmitted(), true);
- Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye");
- long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc();
- Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+
- Time.makeTimeString(elapsed, true));
- Assert.assertEquals(Iterables.size(t.getChildren()), 2);
- Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages);
- // either main or hello can be first, but world should be last
- Assert.assertEquals(messages.get(2), "world");
- }
-
- public Callable<String> sayCallable(final String message, final Duration duration, final String message2) {
- return new Callable<String>() {
- public String call() {
- try {
- if (message != null) {
- log.info("saying: "+message+ " - "+Tasks.current());
- synchronized (messages) {
- messages.add(message);
- messages.notifyAll();
- }
- }
- if (message2 != null) {
- log.info("will say "+message2+" after "+duration);
- }
- if (duration != null && duration.toMilliseconds() > 0) {
- Thread.sleep(duration.toMillisecondsRoundingUp());
- }
- } catch (InterruptedException e) {
- cancellations.release();
- throw Exceptions.propagate(e);
- }
- if (message2 != null) {
- log.info("saying: "+message2+ " - "+Tasks.current());
- synchronized (messages) {
- messages.add(message2);
- messages.notifyAll();
- }
- }
- return message;
- }
- };
- }
-
- public Task<String> sayTask(String message) {
- return sayTask(message, null, null);
- }
-
- public Task<String> sayTask(String message, Duration duration, String message2) {
- return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build();
- }
-
- @Test
- public void testComplex() throws InterruptedException, ExecutionException {
- Task<List<?>> t = Tasks.sequential(
- sayTask("1"),
- sayTask("2"),
- Tasks.parallel(sayTask("4"), sayTask("3")),
- sayTask("5")
- );
- ec.submit(t);
- Assert.assertEquals(t.get().size(), 4);
- Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4"));
- Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages);
- }
-
- @Test
- public void testCancelled() throws InterruptedException, ExecutionException {
- Task<List<?>> t = Tasks.sequential(
- sayTask("1"),
- sayTask("2a", Duration.THIRTY_SECONDS, "2b"),
- sayTask("3"));
- ec.submit(t);
- synchronized (messages) {
- while (messages.size() <= 1)
- messages.wait();
- }
- Assert.assertEquals(messages, Arrays.asList("1", "2a"));
- Time.sleep(Duration.millis(50));
- t.cancel(true);
- Assert.assertTrue(t.isDone());
- // 2 should get cancelled, and invoke the cancellation semaphore
- // 3 should get cancelled and not run at all
- Assert.assertEquals(messages, Arrays.asList("1", "2a"));
-
- // Need to ensure that 2 has been started; race where we might cancel it before its run method
- // is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause.
- Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS));
-
- Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator();
- Assert.assertEquals(ci.next().get(), "1");
- Task<?> task2 = ci.next();
- Assert.assertTrue(task2.isBegun());
- Assert.assertTrue(task2.isDone());
- Assert.assertTrue(task2.isCancelled());
-
- Task<?> task3 = ci.next();
- Assert.assertFalse(task3.isBegun());
- Assert.assertTrue(task2.isDone());
- Assert.assertTrue(task2.isCancelled());
-
- // but we do _not_ get a mutex from task3 as it does not run (is not interrupted)
- Assert.assertEquals(cancellations.availablePermits(), 0);
- }
-
- protected Task<String> monitorableTask(final String id) {
- return monitorableTask(null, id, null);
- }
- protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) {
- Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build();
- monitorableTasksMap.put(id, t);
- return t;
- }
- protected Callable<String> monitorableJob(final String id) {
- return monitorableJob(null, id, null);
- }
- protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) {
- monitorableJobSemaphoreMap.put(id, new Semaphore(0));
- return new Callable<String>() {
- @Override
- public String call() throws Exception {
- if (pre!=null) pre.run();
- // wait for semaphore
- if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS))
- throw new IllegalStateException("timeout for "+id);
- synchronized (messages) {
- messages.add(id);
- messages.notifyAll();
- }
- if (post!=null) return post.call();
- return id;
- }
- };
- }
- protected void releaseMonitorableJob(final String id) {
- monitorableJobSemaphoreMap.get(id).release();
- }
- protected void waitForMessage(final String id) {
- CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT);
- synchronized (messages) {
- while (!timer.isExpired()) {
- if (messages.contains(id)) return;
- timer.waitOnForExpiryUnchecked(messages);
- }
- }
- Assert.fail("Did not see message "+id);
- }
- protected void releaseAndWaitForMonitorableJob(final String id) {
- releaseMonitorableJob(id);
- waitForMessage(id);
- }
-
- @Test
- public void testChildrenRunConcurrentlyWithPrimary() {
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .add(monitorableTask("1")).add(monitorableTask("2")).build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- releaseAndWaitForMonitorableJob("main");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseMonitorableJob("2");
-
- Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
- Assert.assertEquals(messages, MutableList.of("1", "main", "2"));
- Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
- Assert.assertFalse(t.isError());
- }
-
- protected static class FailRunnable implements Runnable {
- @Override public void run() { throw new RuntimeException("Planned exception for test"); }
- }
- protected static class FailCallable implements Callable<String> {
- @Override public String call() { throw new RuntimeException("Planned exception for test"); }
- }
-
- @Test
- public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() {
- Task<String> t1 = monitorableTask(null, "1", new FailCallable());
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .add(t1).add(monitorableTask("2")).build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseMonitorableJob("main");
-
- Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
- Assert.assertEquals(messages, MutableList.of("1", "main"));
- Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
- Assert.assertTrue(t.isError());
- Assert.assertTrue(t1.isError());
- }
-
- @Test
- public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
- Task<String> t1 = monitorableTask(null, "1", new FailCallable());
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseAndWaitForMonitorableJob("2");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseMonitorableJob("main");
- Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
- Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
- Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
- Assert.assertFalse(t.isError());
- Assert.assertTrue(t1.isError());
- }
-
- @Test
- public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() {
- Task<String> t1 = monitorableTask(null, "1", new FailCallable());
- TaskTags.markInessential(t1);
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .add(t1).add(monitorableTask("2")).build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseAndWaitForMonitorableJob("2");
- Assert.assertFalse(t.blockUntilEnded(TINY_TIME));
- releaseMonitorableJob("main");
- Assert.assertTrue(t.blockUntilEnded(TIMEOUT));
- Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
- Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch);
- Assert.assertFalse(t.isError());
- Assert.assertTrue(t1.isError());
- }
-
- @Test
- public void testTaskBuilderUsingAddVarargChildren() {
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .add(monitorableTask("1"), monitorableTask("2"))
- .build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- releaseAndWaitForMonitorableJob("2");
- releaseAndWaitForMonitorableJob("main");
-
- Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
- }
-
- @Test
- public void testTaskBuilderUsingAddAllChildren() {
- Task<String> t = Tasks.<String>builder().dynamic(true)
- .body(monitorableJob("main"))
- .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2")))
- .build();
- ec.submit(t);
- releaseAndWaitForMonitorableJob("1");
- releaseAndWaitForMonitorableJob("2");
- releaseAndWaitForMonitorableJob("main");
-
- Assert.assertEquals(messages, MutableList.of("1", "2", "main"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java
deleted file mode 100644
index 82e3919..0000000
--- a/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.test.Asserts;
-import brooklyn.util.collections.MutableMap;
-
-/**
- * Test the operation of the {@link BasicTask} class.
- *
- * TODO clarify test purpose
- */
-public class NonBasicTaskExecutionTest {
- private static final Logger log = LoggerFactory.getLogger(NonBasicTaskExecutionTest.class);
-
- private static final int TIMEOUT_MS = 10*1000;
-
- public static class ConcreteForwardingTask<T> extends ForwardingTask<T> {
- private final TaskInternal<T> delegate;
-
- ConcreteForwardingTask(TaskInternal<T> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- protected TaskInternal<T> delegate() {
- return delegate;
- }
- }
-
- private BasicExecutionManager em;
- private Map<Integer,String> data;
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() throws Exception {
- em = new BasicExecutionManager("mycontext");
- data = Collections.synchronizedMap(new HashMap<Integer,String>());
- }
-
- @AfterMethod(alwaysRun=true)
- public void tearDown() throws Exception {
- if (em != null) em.shutdownNow();
- }
-
- @Test
- public void runSimpleTask() throws Exception {
- TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
- @Override public Object call() {
- return data.put(1, "b");
- }}));
- data.put(1, "a");
- Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
- assertEquals("a", t.get());
- assertEquals("a", t2.get());
- assertSame(t, t2, "t="+t+"; t2="+t2);
- assertEquals("b", data.get(1));
- }
-
- @Test
- public void runBasicTaskWithWaits() throws Exception {
- final CountDownLatch signalStarted = new CountDownLatch(1);
- final CountDownLatch allowCompletion = new CountDownLatch(1);
- final TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Object result = data.put(1, "b");
- signalStarted.countDown();
- assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
- return result;
- }}));
- data.put(1, "a");
-
- Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
- assertEquals(t, t2);
- assertFalse(t.isDone());
-
- assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
- assertEquals("b", data.get(1));
- assertFalse(t.isDone());
-
- log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false));
-
- Asserts.succeedsEventually(new Runnable() {
- @Override public void run() {
- t.getStatusDetail(false).toLowerCase().contains("waiting");
- }});
- // "details="+t.getStatusDetail(false))
-
- allowCompletion.countDown();
- assertEquals("a", t.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java b/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java
deleted file mode 100644
index 1ae65f2..0000000
--- a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import brooklyn.test.Asserts;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.RuntimeInterruptedException;
-import brooklyn.util.javalang.JavaClassNames;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings({"unchecked","rawtypes"})
-public class ScheduledExecutionTest {
-
- public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class);
-
- @Test
- public void testScheduledTask() throws Exception {
- int PERIOD = 20;
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final AtomicInteger i = new AtomicInteger(0);
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD, "maxIterations", 5), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- return i.incrementAndGet();
- }});
- }});
-
- log.info("submitting {} {}", t, t.getStatusDetail(false));
- m.submit(t);
- log.info("submitted {} {}", t, t.getStatusDetail(false));
- Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertTrue(i.get() > 0, "i="+i);
- t.blockUntilEnded();
- Integer finalResult = (Integer) t.get();
- log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
- assertEquals(finalResult, (Integer)5);
- assertEquals(i.get(), 5);
- }
-
- /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */
- @Test
- public void testScheduledTaskSelfEnding() throws Exception {
- int PERIOD = 20;
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final AtomicInteger i = new AtomicInteger(0);
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
- if (i.get() >= 4) submitter.period = null;
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- return i.incrementAndGet();
- }});
- }});
-
- log.info("submitting {} {}", t, t.getStatusDetail(false));
- m.submit(t);
- log.info("submitted {} {}", t, t.getStatusDetail(false));
- Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertTrue(i.get() > 0);
- t.blockUntilEnded();
- Integer finalResult = (Integer) t.get();
- log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)});
- assertEquals(finalResult, (Integer)5);
- assertEquals(i.get(), 5);
- }
-
- @Test
- public void testScheduledTaskCancelEnding() throws Exception {
- Duration PERIOD = Duration.millis(20);
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final AtomicInteger i = new AtomicInteger();
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask();
- i.incrementAndGet();
- if (i.get() >= 5) submitter.cancel();
- return i.get();
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
- m.submit(t);
- log.info("submitted {} {}", t, t.getStatusDetail(false));
- Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertTrue(i.get() > 0);
- t.blockUntilEnded();
-// int finalResult = t.get()
- log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)});
-// assertEquals(finalResult, 5)
- assertEquals(i.get(), 5);
- }
-
- @Test(groups="Integration")
- public void testScheduledTaskCancelOuter() throws Exception {
- final Duration PERIOD = Duration.millis(20);
- final Duration CYCLE_DELAY = Duration.ONE_SECOND;
- // this should be enough to start the next cycle, but not so much that the cycle ends;
- // and enough that when a task is interrupted it terminates within this period
- final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final AtomicInteger i = new AtomicInteger();
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- Time.sleep(CYCLE_DELAY);
- i.incrementAndGet();
- return i.get();
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
- m.submit(t);
- log.info("submitted {} {}", t, t.getStatusDetail(false));
- Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertEquals(i.get(), 1);
-
- Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
- assertEquals(t.get(), 2);
-
- Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
- Stopwatch timer = Stopwatch.createUnstarted();
- t.cancel(true);
- t.blockUntilEnded();
-// int finalResult = t.get()
- log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
- try {
- t.get();
- Assert.fail("Should have failed getting result of cancelled "+t);
- } catch (Exception e) {
- /* expected */
- }
- assertEquals(i.get(), 2);
- log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
- Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
- }
-
- @Test(groups="Integration")
- public void testScheduledTaskCancelInterrupts() throws Exception {
- final Duration PERIOD = Duration.millis(20);
- final Duration CYCLE_DELAY = Duration.ONE_SECOND;
- // this should be enough to start the next cycle, but not so much that the cycle ends;
- // and enough that when a task is interrupted it terminates within this period
- final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1));
-
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final Semaphore interruptedSemaphore = new Semaphore(0);
- final AtomicInteger i = new AtomicInteger();
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Integer>(new Callable<Integer>() {
- public Integer call() {
- try {
- log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
- Time.sleep(CYCLE_DELAY);
- i.incrementAndGet();
- return i.get();
- } catch (RuntimeInterruptedException e) {
- interruptedSemaphore.release();
- throw Exceptions.propagate(e);
- }
- }});
- }});
-
- log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false));
- m.submit(t);
- log.info("submitted {} {}", t, t.getStatusDetail(false));
- Integer interimResult = (Integer) t.get();
- log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)});
- assertEquals(i.get(), 1);
-
- Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
- assertEquals(t.get(), 2);
-
- Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY);
- Stopwatch timer = Stopwatch.createUnstarted();
- t.cancel(true);
- t.blockUntilEnded();
-// int finalResult = t.get()
- log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
- try {
- t.get();
- Assert.fail("Should have failed getting result of cancelled "+t);
- } catch (Exception e) {
- /* expected */
- }
- assertEquals(i.get(), 2);
- Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted");
- log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)});
- Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY));
- }
-
- @Test(groups="Integration")
- public void testScheduledTaskTakesLongerThanPeriod() throws Exception {
- final int PERIOD = 1;
- final int SLEEP_TIME = 100;
- final int EARLY_RETURN_GRACE = 10;
- BasicExecutionManager m = new BasicExecutionManager("mycontextid");
- final List<Long> execTimes = new CopyOnWriteArrayList<Long>();
-
- ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() {
- public Task<?> call() throws Exception {
- return new BasicTask<Void>(new Runnable() {
- public void run() {
- execTimes.add(System.currentTimeMillis());
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- }});
- }});
-
- m.submit(t);
-
- Asserts.succeedsEventually(new Runnable() {
- public void run() {
- assertTrue(execTimes.size() > 3, "size="+execTimes.size());
- }});
-
- List<Long> timeDiffs = Lists.newArrayList();
- long prevExecTime = -1;
- for (Long execTime : execTimes) {
- if (prevExecTime == -1) {
- prevExecTime = execTime;
- } else {
- timeDiffs.add(execTime - prevExecTime);
- prevExecTime = execTime;
- }
- }
-
- for (Long timeDiff : timeDiffs) {
- if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java b/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java
deleted file mode 100644
index 265956d..0000000
--- a/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.fail;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.test.Asserts;
-import brooklyn.util.collections.MutableMap;
-
-import com.google.common.util.concurrent.Callables;
-
-public class SingleThreadedSchedulerTest {
-
- private static final Logger log = LoggerFactory.getLogger(SingleThreadedSchedulerTest.class);
-
- private BasicExecutionManager em;
-
- @BeforeMethod
- public void setUp() {
- em = new BasicExecutionManager("mycontextid");
- em.setTaskSchedulerForTag("category1", SingleThreadedScheduler.class);
- }
-
- @AfterMethod
- public void tearDown() {
- if (em != null) em.shutdownNow();
- }
-
- @Test
- public void testExecutesInOrder() throws Exception {
- final int NUM_TIMES = 1000;
- final List<Integer> result = new CopyOnWriteArrayList<Integer>();
- for (int i = 0; i < NUM_TIMES; i++) {
- final int counter = i;
- em.submit(MutableMap.of("tag", "category1"), new Runnable() {
- public void run() {
- result.add(counter);
- }});
- }
-
- Asserts.succeedsEventually(new Runnable() {
- @Override public void run() {
- assertEquals(result.size(), NUM_TIMES);
- }});
-
- for (int i = 0; i < NUM_TIMES; i++) {
- assertEquals(result.get(i), (Integer)i);
- }
- }
-
- @Test
- public void testLargeQueueDoesNotConsumeTooManyThreads() throws Exception {
- final int NUM_TIMES = 3000;
- final CountDownLatch latch = new CountDownLatch(1);
- BasicTask<Void> blockingTask = new BasicTask<Void>(newLatchAwaiter(latch));
- em.submit(MutableMap.of("tag", "category1"), blockingTask);
-
- final AtomicInteger counter = new AtomicInteger(0);
- for (int i = 0; i < NUM_TIMES; i++) {
- BasicTask<Void> t = new BasicTask<Void>(new Runnable() {
- public void run() {
- counter.incrementAndGet();
- }});
- em.submit(MutableMap.of("tag", "category1"), t);
- if (i % 500 == 0) log.info("Submitted "+i+" jobs...");
- }
-
- Thread.sleep(100); // give it more of a chance to create the threads before we let them execute
- latch.countDown();
-
- Asserts.succeedsEventually(new Runnable() {
- @Override public void run() {
- assertEquals(counter.get(), NUM_TIMES);
- }});
- }
-
- @Test
- public void testGetResultOfQueuedTaskBeforeItExecutes() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
-
- BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
- Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
-
- Thread thread = new Thread(new Runnable() {
- public void run() {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- latch.countDown();
- }});
- thread.start();
- assertEquals(future.get(), (Integer)123);
- }
-
- @Test
- public void testGetResultOfQueuedTaskBeforeItExecutesWithTimeout() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
-
- BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
- Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
-
- try {
- assertEquals(future.get(10, TimeUnit.MILLISECONDS), (Integer)123);
- fail();
- } catch (TimeoutException e) {
- // success
- }
- }
-
- @Test
- public void testCancelQueuedTaskBeforeItExecutes() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
-
- final AtomicBoolean executed = new AtomicBoolean();
- BasicTask<?> t = new BasicTask<Void>(new Runnable() {
- public void run() {
- executed.set(true);
- }});
- Future<?> future = em.submit(MutableMap.of("tag", "category1"), t);
-
- future.cancel(true);
- latch.countDown();
- Thread.sleep(10);
- try {
- future.get();
- } catch (CancellationException e) {
- // success
- }
- assertFalse(executed.get());
- }
-
- @Test
- public void testGetResultOfQueuedTaskAfterItExecutes() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch));
-
- BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123));
- Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t);
-
- latch.countDown();
- assertEquals(future.get(), (Integer)123);
- }
-
- private Callable<Void> newLatchAwaiter(final CountDownLatch latch) {
- return new Callable<Void>() {
- public Void call() throws Exception {
- latch.await();
- return null;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java b/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java
deleted file mode 100644
index 51750ca..0000000
--- a/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.management.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Stopwatch;
-
-public class TaskFinalizationTest {
-
- private static final Logger log = LoggerFactory.getLogger(TaskFinalizationTest.class);
-
- // integration because it can take a while (and finalizers aren't even guaranteed)
- @Test(groups="Integration")
- public void testFinalizerInvoked() throws InterruptedException {
- BasicTask<?> t = new BasicTask<Void>(new Runnable() { public void run() { /* no op */ }});
- final Semaphore x = new Semaphore(0);
- t.setFinalizer(new BasicTask.TaskFinalizer() {
- public void onTaskFinalization(Task<?> t) {
- synchronized (x) {
- x.release();
- }
- }
- });
- t = null;
- Stopwatch watch = Stopwatch.createStarted();
- for (int i=0; i<30; i++) {
- System.gc(); System.gc();
- if (x.tryAcquire(1, TimeUnit.SECONDS)) {
- log.info("finalizer ran after "+Time.makeTimeStringRounded(watch));
- return;
- }
- }
- Assert.fail("finalizer did not run in time");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/TasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java
deleted file mode 100644
index 58ce24f..0000000
--- a/core/src/test/java/brooklyn/util/task/TasksTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.test.entity.TestApplication;
-import org.apache.brooklyn.test.entity.TestEntity;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.BrooklynAppUnitTestSupport;
-import brooklyn.entity.basic.EntityFunctions;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.time.Duration;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Callables;
-
-
-public class TasksTest extends BrooklynAppUnitTestSupport {
-
- private ExecutionContext executionContext;
-
- @BeforeMethod(alwaysRun=true)
- @Override
- public void setUp() throws Exception {
- super.setUp();
- executionContext = app.getExecutionContext();
- }
-
- @Test
- public void testResolveNull() throws Exception {
- assertResolvesValue(null, String.class, null);
- }
-
- @Test
- public void testResolveValueCastsToType() throws Exception {
- assertResolvesValue(123, String.class, "123");
- }
-
- @Test
- public void testResolvesAttributeWhenReady() throws Exception {
- app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
- assertResolvesValue(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE), String.class, "myval");
- }
-
- @Test
- public void testResolvesMapWithAttributeWhenReady() throws Exception {
- app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
- Map<?,?> orig = ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
- Map<?,?> expected = ImmutableMap.of("mykey", "myval");
- assertResolvesValue(orig, String.class, expected);
- }
-
- @Test
- public void testResolvesSetWithAttributeWhenReady() throws Exception {
- app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
- Set<?> orig = ImmutableSet.of(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE));
- Set<?> expected = ImmutableSet.of("myval");
- assertResolvesValue(orig, String.class, expected);
- }
-
- @Test
- public void testResolvesMapOfMapsWithAttributeWhenReady() throws Exception {
- app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
- Map<?,?> orig = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)));
- Map<?,?> expected = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", "myval"));
- assertResolvesValue(orig, String.class, expected);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testResolvesIterableOfMapsWithAttributeWhenReady() throws Exception {
- app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval");
- // using Iterables.concat so that orig is of type FluentIterable rather than List etc
- Iterable<?> orig = Iterables.concat(ImmutableList.of(ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))));
- Iterable<Map<?,?>> expected = ImmutableList.<Map<?,?>>of(ImmutableMap.of("mykey", "myval"));
- assertResolvesValue(orig, String.class, expected);
- }
-
- private void assertResolvesValue(Object actual, Class<?> type, Object expected) throws Exception {
- Object result = Tasks.resolveValue(actual, type, executionContext);
- assertEquals(result, expected);
- }
-
- @Test
- public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception {
- app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO));
- Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build();
- ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext());
-
- ValueResolverTest.assertThrowsOnMaybe(v);
- ValueResolverTest.assertThrowsOnGet(v);
-
- v.swallowExceptions();
- ValueResolverTest.assertMaybeIsAbsent(v);
- ValueResolverTest.assertThrowsOnGet(v);
-
- v.defaultValue("foo");
- ValueResolverTest.assertMaybeIsAbsent(v);
- assertEquals(v.clone().get(), "foo");
- assertResolvesValue(v, Object.class, "foo");
- }
-
- @Test
- public void testRepeater() throws Exception {
- Task<?> t;
-
- t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
- app.getExecutionContext().submit(t);
- t.get(Duration.TEN_SECONDS);
-
- t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build();
- app.getExecutionContext().submit(t);
- Assert.assertEquals(t.get(Duration.TEN_SECONDS), true);
-
- t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
- app.getExecutionContext().submit(t);
- try {
- t.get(Duration.TEN_SECONDS);
- Assert.fail("Should have failed");
- } catch (Exception e) {
- // expected
- }
-
- t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build();
- app.getExecutionContext().submit(t);
- Assert.assertEquals(t.get(Duration.TEN_SECONDS), false);
- }
-
- @Test
- public void testRepeaterDescription() throws Exception{
- final String description = "task description";
- Repeater repeater = Repeater.create(description)
- .repeat(Callables.returning(null))
- .every(Duration.ONE_MILLISECOND)
- .limitIterationsTo(1)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() {
- TaskInternal<?> current = (TaskInternal<?>)Tasks.current();
- assertEquals(current.getBlockingDetails(), description);
- return true;
- }
- });
- Task<Boolean> t = Tasks.testing(repeater).build();
- app.getExecutionContext().submit(t);
- assertTrue(t.get(Duration.TEN_SECONDS));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ValueResolverTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java b/core/src/test/java/brooklyn/util/task/ValueResolverTest.java
deleted file mode 100644
index d50ff54..0000000
--- a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.Task;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.BrooklynAppUnitTestSupport;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-/**
- * see also {@link TasksTest} for more tests
- */
-@Test
-public class ValueResolverTest extends BrooklynAppUnitTestSupport {
-
- private ExecutionContext executionContext;
-
- @BeforeMethod(alwaysRun=true)
- @Override
- public void setUp() throws Exception {
- super.setUp();
- executionContext = app.getExecutionContext();
- }
-
- public static final Task<String> newSleepTask(final Duration timeout, final String result) {
- return Tasks.<String>builder().body(new Callable<String>() {
- public String call() {
- Time.sleep(timeout);
- return result;
- }}
- ).build();
- }
-
- public static final Task<String> newThrowTask(final Duration timeout) {
- return Tasks.<String>builder().body(new Callable<String>() {
- public String call() {
- Time.sleep(timeout);
- throw new IllegalStateException("intended, during tests");
- }}
- ).build();
- }
-
- public void testTimeoutZero() {
- Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe();
- Assert.assertFalse(result.isPresent());
- }
-
- public void testTimeoutBig() {
- Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(executionContext).timeout(Duration.TEN_SECONDS).getMaybe();
- Assert.assertEquals(result.get(), "foo");
- }
-
- public void testNoExecutionContextOnCompleted() {
- Task<String> t = newSleepTask(Duration.ZERO, "foo");
- executionContext.submit(t).getUnchecked();
- Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe();
- Assert.assertEquals(result.get(), "foo");
- }
-
- public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) {
- try {
- result = result.clone();
- result.getMaybe();
- Assert.fail("should have thrown");
- return null;
- } catch (Exception e) { return e; }
- }
- public static Throwable assertThrowsOnGet(ValueResolver<?> result) {
- result = result.clone();
- try {
- result.get();
- Assert.fail("should have thrown");
- return null;
- } catch (Exception e) { return e; }
- }
- public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) {
- result = result.clone();
- Maybe<T> maybe = result.getMaybe();
- Assert.assertFalse(maybe.isPresent());
- return maybe;
- }
-
- public void testSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions();
- assertMaybeIsAbsent(result);
- assertThrowsOnGet(result);
- }
-
-
- public void testDontSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext);
- assertThrowsOnMaybe(result);
- assertThrowsOnGet(result);
- }
-
- public void testDefaultWhenSwallowError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo");
- assertMaybeIsAbsent(result);
- Assert.assertEquals(result.get(), "foo");
- }
-
- public void testDefaultBeforeDelayAndError() {
- ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo");
- assertMaybeIsAbsent(result);
- Assert.assertEquals(result.get(), "foo");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java b/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java
deleted file mode 100644
index 578164f..0000000
--- a/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task.ssh;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.brooklyn.api.location.LocationSpec;
-import org.apache.brooklyn.api.management.ManagementContext;
-import org.apache.brooklyn.core.management.internal.LocalManagementContext;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.basic.BrooklynConfigKeys;
-import brooklyn.entity.basic.Entities;
-
-import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-
-import brooklyn.util.net.Urls;
-import brooklyn.util.os.Os;
-import brooklyn.util.ssh.BashCommandsIntegrationTest;
-import brooklyn.util.task.system.ProcessTaskFactory;
-import brooklyn.util.task.system.ProcessTaskWrapper;
-
-/**
- * Some tests for {@link SshTasks}. Note more tests in {@link BashCommandsIntegrationTest},
- * {@link SshEffectorTasksTest}, and {@link SoftwareEffectorTest}.
- */
-public class SshTasksTest {
-
- private static final Logger log = LoggerFactory.getLogger(SshTasksTest.class);
-
- ManagementContext mgmt;
- SshMachineLocation host;
- File tempDir;
-
- boolean failureExpected;
-
- @BeforeMethod(alwaysRun=true)
- public void setup() throws Exception {
- mgmt = new LocalManagementContext();
-
- LocalhostMachineProvisioningLocation lhc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class));
- host = lhc.obtain();
- clearExpectedFailure();
- tempDir = Os.newTempDir(getClass());
- }
-
- @AfterMethod(alwaysRun=true)
- public void tearDown() throws Exception {
- if (mgmt != null) Entities.destroyAll(mgmt);
- mgmt = null;
- tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing();
- checkExpectedFailure();
- }
-
- protected void checkExpectedFailure() {
- if (failureExpected) {
- clearExpectedFailure();
- Assert.fail("Test should have thrown an exception but it did not.");
- }
- }
-
- protected void clearExpectedFailure() {
- failureExpected = false;
- }
-
- protected void setExpectingFailure() {
- failureExpected = true;
- }
-
-
- protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) {
- tf.machine(host);
- ProcessTaskWrapper<T> t = tf.newTask();
- mgmt.getExecutionManager().submit(t);
- return t;
- }
-
- protected SshPutTaskWrapper submit(final SshPutTaskFactory tf) {
- SshPutTaskWrapper t = tf.newTask();
- mgmt.getExecutionManager().submit(t);
- return t;
- }
-
- @Test(groups="Integration")
- public void testSshEchoHello() {
- ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "sleep 1 ; echo hello world"));
- Assert.assertFalse(t.isDone());
- Assert.assertEquals(t.get(), (Integer)0);
- Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0);
- Assert.assertEquals(t.getStdout().trim(), "hello world");
- }
-
- @Test(groups="Integration")
- public void testCopyTo() throws IOException {
- String fn = Urls.mergePaths(tempDir.getPath(), "f1");
- SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
- t.block();
- Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
- // and make sure this doesn't throw
- Assert.assertTrue(t.isDone());
- Assert.assertTrue(t.isSuccessful());
- Assert.assertEquals(t.get(), null);
- Assert.assertEquals(t.getExitCode(), (Integer)0);
- }
-
- @Test(groups="Integration")
- public void testCopyToFailBadSubdir() throws IOException {
- String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
- SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world"));
- //this doesn't fail
- t.block();
- Assert.assertTrue(t.isDone());
- setExpectingFailure();
- try {
- // but this does
- t.get();
- } catch (Exception e) {
- log.info("The error if file cannot be written is: "+e);
- clearExpectedFailure();
- }
- checkExpectedFailure();
- // and the results indicate failure
- Assert.assertFalse(t.isSuccessful());
- Assert.assertNotNull(t.getException());
- Assert.assertNotEquals(t.getExitCode(), (Integer)0);
- }
-
- @Test(groups="Integration")
- public void testCopyToFailBadSubdirAllow() throws IOException {
- String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file");
- SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").allowFailure());
- //this doesn't fail
- t.block();
- Assert.assertTrue(t.isDone());
- // and this doesn't fail either
- Assert.assertEquals(t.get(), null);
- // but it's not successful
- Assert.assertNotNull(t.getException());
- Assert.assertFalse(t.isSuccessful());
- // exit code probably null, but won't be zero
- Assert.assertNotEquals(t.getExitCode(), (Integer)0);
- }
-
- @Test(groups="Integration")
- public void testCopyToFailBadSubdirCreate() throws IOException {
- String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir-to-create/file");
- SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").createDirectory());
- t.block();
- // directory should be created, and file readable now
- Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world");
- Assert.assertEquals(t.getExitCode(), (Integer)0);
- }
-
- @Test(groups="Integration")
- public void testSshFetch() throws IOException {
- String fn = Urls.mergePaths(tempDir.getPath(), "f2");
- FileUtils.write(new File(fn), "hello fetched world");
-
- SshFetchTaskFactory tf = SshTasks.newSshFetchTaskFactory(host, fn);
- SshFetchTaskWrapper t = tf.newTask();
- mgmt.getExecutionManager().submit(t);
-
- t.block();
- Assert.assertTrue(t.isDone());
- Assert.assertEquals(t.get(), "hello fetched world");
- Assert.assertEquals(t.getBytes(), "hello fetched world".getBytes());
- }
-
- @Test(groups="Integration")
- public void testSshWithHeaderProperty() {
- host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
- ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "echo bar"));
- Assert.assertTrue(t.block().getStdout().trim().matches("foo\\s+bar"), "mismatched output was: "+t.getStdout());
- }
-
- @Test(groups="Integration")
- public void testSshIgnoringHeaderProperty() {
- host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n");
- ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, false, "echo bar"));
- Assert.assertTrue(t.block().getStdout().trim().matches("bar"), "mismatched output was: "+t.getStdout());
- }
-
-}