You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/17 21:17:42 UTC
[11/42] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
new file mode 100644
index 0000000..cdaf433
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/osgi/OsgisTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.osgi;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import org.apache.brooklyn.core.util.osgi.Osgis;
+import org.apache.brooklyn.core.util.osgi.Osgis.VersionedName;
+import org.osgi.framework.Version;
+import org.testng.annotations.Test;
+
+public class OsgisTest {
+
+ @Test
+ public void testParseOsgiIdentifier() throws Exception {
+ assertEquals(Osgis.parseOsgiIdentifier("a.b").get(), new VersionedName("a.b", null));
+ assertEquals(Osgis.parseOsgiIdentifier("a.b:0.1.2").get(), new VersionedName("a.b", Version.parseVersion("0.1.2")));
+ assertEquals(Osgis.parseOsgiIdentifier("a.b:0.0.0.SNAPSHOT").get(), new VersionedName("a.b", Version.parseVersion("0.0.0.SNAPSHOT")));
+ assertFalse(Osgis.parseOsgiIdentifier("a.b:0.notanumber.2").isPresent()); // invalid version
+ assertFalse(Osgis.parseOsgiIdentifier("a.b:0.1.2:3.4.5").isPresent()); // too many colons
+ assertFalse(Osgis.parseOsgiIdentifier("a.b:0.0.0_SNAPSHOT").isPresent()); // invalid version
+ assertFalse(Osgis.parseOsgiIdentifier("").isPresent());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
new file mode 100644
index 0000000..77f91f6
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/ssh/BashCommandsIntegrationTest.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.ssh;
+
+import static brooklyn.util.ssh.BashCommands.sudo;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.ssh.SshTasks;
+import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+
+public class BashCommandsIntegrationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(BashCommandsIntegrationTest.class);
+
+ private ManagementContext mgmt;
+ private BasicExecutionContext exec;
+
+ private File destFile;
+ private File sourceNonExistantFile;
+ private File sourceFile1;
+ private File sourceFile2;
+ private String sourceNonExistantFileUrl;
+ private String sourceFileUrl1;
+ private String sourceFileUrl2;
+ private SshMachineLocation loc;
+
+ private String localRepoFilename = "localrepofile.txt";
+ private File localRepoBasePath;
+ private File localRepoEntityBasePath;
+ private String localRepoEntityVersionPath;
+ private File localRepoEntityFile;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ mgmt = new LocalManagementContextForTests();
+ exec = new BasicExecutionContext(mgmt.getExecutionManager());
+
+ destFile = Os.newTempFile(getClass(), "commoncommands-test-dest.txt");
+
+ sourceNonExistantFile = new File("/this/does/not/exist/ERQBETJJIG1234");
+ sourceNonExistantFileUrl = sourceNonExistantFile.toURI().toString();
+
+ sourceFile1 = Os.newTempFile(getClass(), "commoncommands-test.txt");
+ sourceFileUrl1 = sourceFile1.toURI().toString();
+ Files.write("mysource1".getBytes(), sourceFile1);
+
+ sourceFile2 = Os.newTempFile(getClass(), "commoncommands-test2.txt");
+ sourceFileUrl2 = sourceFile2.toURI().toString();
+ Files.write("mysource2".getBytes(), sourceFile2);
+
+ localRepoEntityVersionPath = JavaClassNames.simpleClassName(this)+"-test-dest-"+Identifiers.makeRandomId(8);
+ localRepoBasePath = new File(format("%s/.brooklyn/repository", System.getProperty("user.home")));
+ localRepoEntityBasePath = new File(localRepoBasePath, localRepoEntityVersionPath);
+ localRepoEntityFile = new File(localRepoEntityBasePath, localRepoFilename);
+ localRepoEntityBasePath.mkdirs();
+ Files.write("mylocal1".getBytes(), localRepoEntityFile);
+
+ loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (sourceFile1 != null) sourceFile1.delete();
+ if (sourceFile2 != null) sourceFile2.delete();
+ if (destFile != null) destFile.delete();
+ if (localRepoEntityFile != null) localRepoEntityFile.delete();
+ if (localRepoEntityBasePath != null) FileUtils.deleteDirectory(localRepoEntityBasePath);
+ if (loc != null) loc.close();
+ if (mgmt != null) Entities.destroyAll(mgmt);
+ }
+
+ @Test(groups="Integration")
+ public void testSudo() throws Exception {
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream errStream = new ByteArrayOutputStream();
+ String cmd = sudo("whoami");
+ int exitcode = loc.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "test", ImmutableList.of(cmd));
+ String outstr = new String(outStream.toByteArray());
+ String errstr = new String(errStream.toByteArray());
+
+ assertEquals(exitcode, 0, "out="+outstr+"; err="+errstr);
+ assertTrue(outstr.contains("root"), "out="+outstr+"; err="+errstr);
+ }
+
+ public void testDownloadUrl() throws Exception {
+ List<String> cmds = BashCommands.commandsToDownloadUrlsAs(
+ ImmutableList.of(sourceFileUrl1),
+ destFile.getAbsolutePath());
+ int exitcode = loc.execCommands("test", cmds);
+
+ assertEquals(0, exitcode);
+ assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1"));
+ }
+
+ @Test(groups="Integration")
+ public void testDownloadFirstSuccessfulFile() throws Exception {
+ List<String> cmds = BashCommands.commandsToDownloadUrlsAs(
+ ImmutableList.of(sourceNonExistantFileUrl, sourceFileUrl1, sourceFileUrl2),
+ destFile.getAbsolutePath());
+ int exitcode = loc.execCommands("test", cmds);
+
+ assertEquals(0, exitcode);
+ assertEquals(Files.readLines(destFile, Charsets.UTF_8), ImmutableList.of("mysource1"));
+ }
+
+ @Test(groups="Integration")
+ public void testDownloadToStdout() throws Exception {
+ ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc,
+ "cd "+destFile.getParentFile().getAbsolutePath(),
+ BashCommands.downloadToStdout(Arrays.asList(sourceFileUrl1))+" | sed s/my/your/")
+ .requiringZeroAndReturningStdout().newTask();
+
+ String result = exec.submit(t).get();
+ assertTrue(result.trim().equals("yoursource1"), "Wrong contents of stdout download: "+result);
+ }
+
+ @Test(groups="Integration")
+ public void testAlternativesWhereFirstSucceeds() throws Exception {
+ ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.alternatives(Arrays.asList("echo first", "exit 88")))
+ .newTask();
+
+ Integer returnCode = exec.submit(t).get();
+ String stdout = t.getStdout();
+ String stderr = t.getStderr();
+ log.info("alternatives for good first command gave: "+returnCode+"; err="+stderr+"; out="+stdout);
+ assertTrue(stdout.contains("first"), "errcode="+returnCode+"; stdout="+stdout+"; stderr="+stderr);
+ assertEquals(returnCode, (Integer)0);
+ }
+
+ @Test(groups="Integration")
+ public void testAlternatives() throws Exception {
+ ProcessTaskWrapper<Integer> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.alternatives(Arrays.asList("asdfj_no_such_command_1", "exit 88")))
+ .newTask();
+
+ Integer returnCode = exec.submit(t).get();
+ log.info("alternatives for bad commands gave: "+returnCode+"; err="+new String(t.getStderr())+"; out="+new String(t.getStdout()));
+ assertEquals(returnCode, (Integer)88);
+ }
+
+ @Test(groups="Integration")
+ public void testRequireTestHandlesFailure() throws Exception {
+ ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(),
+ "The requested file does not exist")).newTask();
+
+ exec.submit(t).get();
+ assertNotEquals(t.getExitCode(), (Integer)0);
+ assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr());
+ assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout());
+ }
+
+ @Test(groups="Integration")
+ public void testRequireTestHandlesSuccess() throws Exception {
+ ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.requireTest("-f "+sourceFile1.getPath(),
+ "The requested file does not exist")).newTask();
+
+ exec.submit(t).get();
+ assertEquals(t.getExitCode(), (Integer)0);
+ assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr());
+ }
+
+ @Test(groups="Integration")
+ public void testRequireFileHandlesFailure() throws Exception {
+ ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.requireFile(sourceNonExistantFile.getPath())).newTask();
+
+ exec.submit(t).get();
+ assertNotEquals(t.getExitCode(), (Integer)0);
+ assertTrue(t.getStderr().contains("required file"), "Expected message in: "+t.getStderr());
+ assertTrue(t.getStderr().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStderr());
+ assertTrue(t.getStdout().contains("required file"), "Expected message in: "+t.getStdout());
+ assertTrue(t.getStdout().contains(sourceNonExistantFile.getPath()), "Expected message in: "+t.getStdout());
+ }
+
+ @Test(groups="Integration")
+ public void testRequireFileHandlesSuccess() throws Exception {
+ ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.requireFile(sourceFile1.getPath())).newTask();
+
+ exec.submit(t).get();
+ assertEquals(t.getExitCode(), (Integer)0);
+ assertTrue(t.getStderr().equals(""), "Expected no stderr messages, but got: "+t.getStderr());
+ }
+
+ @Test(groups="Integration")
+ public void testRequireFailureExitsImmediately() throws Exception {
+ ProcessTaskWrapper<?> t = SshTasks.newSshExecTaskFactory(loc)
+ .add(BashCommands.requireTest("-f "+sourceNonExistantFile.getPath(),
+ "The requested file does not exist"))
+ .add("echo shouldnae come here").newTask();
+
+ exec.submit(t).get();
+ assertNotEquals(t.getExitCode(), (Integer)0);
+ assertTrue(t.getStderr().contains("The requested file"), "Expected message in: "+t.getStderr());
+ assertTrue(t.getStdout().contains("The requested file"), "Expected message in: "+t.getStdout());
+ Assert.assertFalse(t.getStdout().contains("shouldnae"), "Expected message in: "+t.getStdout());
+ }
+
+ @Test(groups="Integration")
+ public void testPipeMultiline() throws Exception {
+ String output = execRequiringZeroAndReturningStdout(loc,
+ BashCommands.pipeTextTo("hello world\n"+"and goodbye\n", "wc")).get();
+
+ assertEquals(Strings.replaceAllRegex(output, "\\s+", " ").trim(), "3 4 25");
+ }
+
+ @Test(groups="Integration")
+ public void testWaitForFileContentsWhenAbortingOnFail() throws Exception {
+ String fileContent = "mycontents";
+ String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, true);
+
+ int exitcode = loc.execCommands("test", ImmutableList.of(cmd));
+ assertEquals(exitcode, 1);
+
+ Files.write(fileContent, destFile, Charsets.UTF_8);
+ int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd));
+ assertEquals(exitcode2, 0);
+ }
+
+ @Test(groups="Integration")
+ public void testWaitForFileContentsWhenNotAbortingOnFail() throws Exception {
+ String fileContent = "mycontents";
+ String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.ONE_SECOND, false);
+
+ String output = execRequiringZeroAndReturningStdout(loc, cmd).get();
+ assertTrue(output.contains("Couldn't find"), "output="+output);
+
+ Files.write(fileContent, destFile, Charsets.UTF_8);
+ String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get();
+ assertFalse(output2.contains("Couldn't find"), "output="+output2);
+ }
+
+ @Test(groups="Integration")
+ public void testWaitForFileContentsWhenContentsAppearAfterStart() throws Exception {
+ String fileContent = "mycontents";
+
+ String cmd = BashCommands.waitForFileContents(destFile.getAbsolutePath(), fileContent, Duration.THIRTY_SECONDS, false);
+ ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd);
+ exec.submit(t);
+
+ // sleep for long enough to ensure the ssh command is definitely executing
+ Thread.sleep(5*1000);
+ assertFalse(t.isDone());
+
+ Files.write(fileContent, destFile, Charsets.UTF_8);
+ String output = t.get();
+ assertFalse(output.contains("Couldn't find"), "output="+output);
+ }
+
+ @Test(groups="Integration", dependsOnMethods="testSudo")
+ public void testWaitForPortFreeWhenAbortingOnTimeout() throws Exception {
+ ServerSocket serverSocket = openServerSocket();
+ try {
+ int port = serverSocket.getLocalPort();
+ String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, true);
+
+ int exitcode = loc.execCommands("test", ImmutableList.of(cmd));
+ assertEquals(exitcode, 1);
+
+ serverSocket.close();
+ assertTrue(Networking.isPortAvailable(port));
+ int exitcode2 = loc.execCommands("test", ImmutableList.of(cmd));
+ assertEquals(exitcode2, 0);
+ } finally {
+ serverSocket.close();
+ }
+ }
+
+ @Test(groups="Integration", dependsOnMethods="testSudo")
+ public void testWaitForPortFreeWhenNotAbortingOnTimeout() throws Exception {
+ ServerSocket serverSocket = openServerSocket();
+ try {
+ int port = serverSocket.getLocalPort();
+ String cmd = BashCommands.waitForPortFree(port, Duration.ONE_SECOND, false);
+
+ String output = execRequiringZeroAndReturningStdout(loc, cmd).get();
+ assertTrue(output.contains(port+" still in use"), "output="+output);
+
+ serverSocket.close();
+ assertTrue(Networking.isPortAvailable(port));
+ String output2 = execRequiringZeroAndReturningStdout(loc, cmd).get();
+ assertFalse(output2.contains("still in use"), "output="+output2);
+ } finally {
+ serverSocket.close();
+ }
+ }
+
+ @Test(groups="Integration", dependsOnMethods="testSudo")
+ public void testWaitForPortFreeWhenFreedAfterStart() throws Exception {
+ ServerSocket serverSocket = openServerSocket();
+ try {
+ int port = serverSocket.getLocalPort();
+
+ String cmd = BashCommands.waitForPortFree(port, Duration.THIRTY_SECONDS, false);
+ ProcessTaskWrapper<String> t = execRequiringZeroAndReturningStdout(loc, cmd);
+ exec.submit(t);
+
+ // sleep for long enough to ensure the ssh command is definitely executing
+ Thread.sleep(5*1000);
+ assertFalse(t.isDone());
+
+ serverSocket.close();
+ assertTrue(Networking.isPortAvailable(port));
+ String output = t.get();
+ assertFalse(output.contains("still in use"), "output="+output);
+ } finally {
+ serverSocket.close();
+ }
+ }
+
+
+ // Disabled by default because of risk of overriding /etc/hosts in really bad way if doesn't work properly!
+ // As a manual visual inspection test, consider first manually creating /etc/hostname and /etc/sysconfig/network
+ // so that it looks like debian+ubuntu / CentOS/RHEL.
+ @Test(groups={"Integration"}, enabled=false)
+ public void testSetHostnameUnqualified() throws Exception {
+ runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, false);
+ }
+
+ @Test(groups={"Integration"}, enabled=false)
+ public void testSetHostnameQualified() throws Exception {
+ runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase()+".brooklyn.incubator.apache.org", null, false);
+ }
+
+ @Test(groups={"Integration"}, enabled=false)
+ public void testSetHostnameNullDomain() throws Exception {
+ runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), null, true);
+ }
+
+ @Test(groups={"Integration"}, enabled=false)
+ public void testSetHostnameNonNullDomain() throws Exception {
+ runSetHostname("br-"+Identifiers.makeRandomId(8).toLowerCase(), "brooklyn.incubator.apache.org", true);
+ }
+
+ protected void runSetHostname(String newHostname, String newDomain, boolean includeDomain) throws Exception {
+ String fqdn = (includeDomain && Strings.isNonBlank(newDomain)) ? newHostname + "." + newDomain : newHostname;
+
+ LocalManagementContextForTests mgmt = new LocalManagementContextForTests();
+ SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+
+ execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testSetHostname")).get();
+ execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname", sudo("cp /etc/hostname /etc/hostname-orig-testSetHostname"))).get();
+ execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network", sudo("cp /etc/sysconfig/network /etc/sysconfig/network-orig-testSetHostname"))).get();
+
+ String origHostname = getHostnameNoArgs(loc);
+ assertTrue(Strings.isNonBlank(origHostname));
+
+ try {
+ List<String> cmd = (includeDomain) ? BashCommands.setHostname(newHostname, newDomain) : BashCommands.setHostname(newHostname);
+ execRequiringZeroAndReturningStdout(loc, cmd).get();
+
+ String actualHostnameUnqualified = getHostnameUnqualified(loc);
+ String actualHostnameFullyQualified = getHostnameFullyQualified(loc);
+
+ // TODO On OS X at least, we aren't actually setting the domain name; we're just letting
+ // the user pass in what the domain name is. We do add this properly to /etc/hosts
+ // (e.g. first line is "127.0.0.1 br-g4x5wgx8.brooklyn.incubator.apache.org br-g4x5wgx8 localhost")
+ // but subsequent calls to `hostname -f` returns the unqualified. Similarly, `domainname`
+ // returns blank. Therefore we can't assert that it equals our expected val (because we just made
+ // it up - "brooklyn.incubator.apache.org").
+ // assertEquals(actualHostnameFullyQualified, fqdn);
+ assertEquals(actualHostnameUnqualified, Strings.getFragmentBetween(newHostname, null, "."));
+ execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameUnqualified).get();
+ execRequiringZeroAndReturningStdout(loc, "ping -c1 -n -q "+actualHostnameFullyQualified).get();
+
+ String result = execRequiringZeroAndReturningStdout(loc, "grep -n "+fqdn+" /etc/hosts").get();
+ assertTrue(result.contains("localhost"), "line="+result);
+ log.info("result="+result);
+
+ } finally {
+ execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testSetHostname /etc/hosts")).get();
+ execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/hostname-orig-testSetHostname", sudo("cp /etc/hostname-orig-testSetHostname /etc/hostname"))).get();
+ execRequiringZeroAndReturningStdout(loc, BashCommands.ifFileExistsElse0("/etc/sysconfig/network-orig-testSetHostname", sudo("cp /etc/sysconfig/network-orig-testSetHostname /etc/sysconfig/network"))).get();
+ execRequiringZeroAndReturningStdout(loc, sudo("hostname "+origHostname)).get();
+ }
+ }
+
+ // Marked disabled because not safe to run on your normal machine! It modifies /etc/hosts, which is dangerous if things go wrong!
+ @Test(groups={"Integration"}, enabled=false)
+ public void testModifyEtcHosts() throws Exception {
+ LocalManagementContextForTests mgmt = new LocalManagementContextForTests();
+ SshMachineLocation loc = mgmt.getLocationManager().createLocation(LocalhostMachineProvisioningLocation.spec()).obtain();
+
+ execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts /etc/hosts-orig-testModifyEtcHosts")).get();
+ int numLinesOrig = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]);
+
+ try {
+ String cmd = BashCommands.prependToEtcHosts("1.2.3.4", "myhostnamefor1234.at.start", "myhostnamefor1234b");
+ execRequiringZeroAndReturningStdout(loc, cmd).get();
+
+ String cmd2 = BashCommands.appendToEtcHosts("5.6.7.8", "myhostnamefor5678.at.end", "myhostnamefor5678");
+ execRequiringZeroAndReturningStdout(loc, cmd2).get();
+
+ String grepFirst = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor1234 /etc/hosts").get();
+ String grepLast = execRequiringZeroAndReturningStdout(loc, "grep -n myhostnamefor5678 /etc/hosts").get();
+ int numLinesAfter = Integer.parseInt(execRequiringZeroAndReturningStdout(loc, "wc -l /etc/hosts").get().trim().split("\\s")[0]);
+ log.info("result: numLinesBefore="+numLinesOrig+"; numLinesAfter="+numLinesAfter+"; first="+grepFirst+"; last="+grepLast);
+
+ assertTrue(grepFirst.startsWith("1:") && grepFirst.contains("1.2.3.4 myhostnamefor1234.at.start myhostnamefor1234"), "first="+grepFirst);
+ assertTrue(grepLast.startsWith((numLinesOrig+2)+":") && grepLast.contains("5.6.7.8 myhostnamefor5678.at.end myhostnamefor5678"), "last="+grepLast);
+ assertEquals(numLinesOrig + 2, numLinesAfter, "lines orig="+numLinesOrig+", after="+numLinesAfter);
+ } finally {
+ execRequiringZeroAndReturningStdout(loc, sudo("cp /etc/hosts-orig-testModifyEtcHosts /etc/hosts")).get();
+ }
+ }
+
+ private String getHostnameNoArgs(SshMachineLocation machine) {
+ String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname; echo AFTMARKER").get();
+ return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+ }
+
+ private String getHostnameUnqualified(SshMachineLocation machine) {
+ String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname -s 2> /dev/null || hostname; echo AFTMARKER").get();
+ return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+ }
+
+ private String getHostnameFullyQualified(SshMachineLocation machine) {
+ String hostnameStdout = execRequiringZeroAndReturningStdout(machine, "echo FOREMARKER; hostname --fqdn 2> /dev/null || hostname -f; echo AFTMARKER").get();
+ return Strings.getFragmentBetween(hostnameStdout, "FOREMARKER", "AFTMARKER").trim();
+ }
+
+ private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, Collection<String> cmds) {
+ return execRequiringZeroAndReturningStdout(loc, cmds.toArray(new String[cmds.size()]));
+ }
+
+ private ProcessTaskWrapper<String> execRequiringZeroAndReturningStdout(SshMachineLocation loc, String... cmds) {
+ ProcessTaskWrapper<String> t = SshTasks.newSshExecTaskFactory(loc, cmds)
+ .requiringZeroAndReturningStdout().newTask();
+ exec.submit(t);
+ return t;
+ }
+
+ private ServerSocket openServerSocket() {
+ int lowerBound = 40000;
+ int upperBound = 40100;
+ for (int i = lowerBound; i < upperBound; i++) {
+ try {
+ return new ServerSocket(i);
+ } catch (IOException e) {
+ // try next number
+ }
+ }
+ throw new IllegalStateException("No ports available in range "+lowerBound+" to "+upperBound);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
new file mode 100644
index 0000000..71d2586
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionPerformanceTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Callables;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class BasicTaskExecutionPerformanceTest {
+ private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionPerformanceTest.class);
+
+ private static final int TIMEOUT_MS = 10*1000;
+
+ private BasicExecutionManager em;
+
+ public static final int MAX_OVERHEAD_MS = 1500; // was 750ms but saw 1.3s on buildhive
+ public static final int EARLY_RETURN_GRACE = 25; // saw 13ms early return on jenkins!
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ em = new BasicExecutionManager("mycontext");
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (em != null) em.shutdownNow();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testScheduledTaskExecutedAfterDelay() throws Exception {
+ int delay = 100;
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+ @Override public Task<?> call() {
+ return new BasicTask<Void>(new Runnable() {
+ @Override public void run() {
+ latch.countDown();
+ }});
+ }};
+ ScheduledTask t = new ScheduledTask(taskFactory).delay(delay);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ em.submit(t);
+
+ assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ long actualDelay = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(actualDelay > (delay-EARLY_RETURN_GRACE), "actualDelay="+actualDelay+"; delay="+delay);
+ assertTrue(actualDelay < (delay+MAX_OVERHEAD_MS), "actualDelay="+actualDelay+"; delay="+delay);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testScheduledTaskExecutedAtRegularPeriod() throws Exception {
+ final int period = 100;
+ final int numTimestamps = 4;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList());
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+ @Override public Task<?> call() {
+ return new BasicTask<Void>(new Runnable() {
+ @Override public void run() {
+ timestamps.add(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ if (timestamps.size() >= numTimestamps) latch.countDown();
+ }});
+ }};
+ ScheduledTask t = new ScheduledTask(taskFactory).delay(1).period(period);
+ em.submit(t);
+
+ assertTrue(latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+
+ synchronized (timestamps) {
+ long prev = timestamps.get(0);
+ for (long timestamp : timestamps.subList(1, timestamps.size())) {
+ assertTrue(timestamp > prev+period-EARLY_RETURN_GRACE, "timestamps="+timestamps);
+ assertTrue(timestamp < prev+period+MAX_OVERHEAD_MS, "timestamps="+timestamps);
+ prev = timestamp;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCanCancelScheduledTask() throws Exception {
+ final int period = 1;
+ final long checkPeriod = 250;
+ final List<Long> timestamps = Collections.synchronizedList(Lists.<Long>newArrayList());
+
+ Callable<Task<?>> taskFactory = new Callable<Task<?>>() {
+ @Override public Task<?> call() {
+ return new BasicTask<Void>(new Runnable() {
+ @Override public void run() {
+ timestamps.add(System.currentTimeMillis());
+ }});
+ }};
+ ScheduledTask t = new ScheduledTask(taskFactory).period(period);
+ em.submit(t);
+
+ t.cancel();
+ long cancelTime = System.currentTimeMillis();
+ int countImmediatelyAfterCancel = timestamps.size();
+ Thread.sleep(checkPeriod);
+ int countWellAfterCancel = timestamps.size();
+
+ // should have at most 1 more execution after cancel
+ log.info("testCanCancelScheduledTask saw "+countImmediatelyAfterCancel+" then cancel then "+countWellAfterCancel+" total");
+ assertTrue(countWellAfterCancel - countImmediatelyAfterCancel <= 2, "timestamps="+timestamps+"; cancelTime="+cancelTime);
+ }
+
+ // Previously, when we used a CopyOnWriteArraySet, performance for submitting new tasks was
+ // terrible, and it degraded significantly as the number of previously executed tasks increased
+ // (e.g. 9s for first 1000; 26s for next 1000; 42s for next 1000).
+ @Test
+ public void testExecutionManagerPerformance() throws Exception {
+ // Was fixed at 1000 tasks, but was running out of virtual memory due to excessive thread creation
+ // on machines which were not able to execute the threads quickly.
+ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+ final int NUM_TIMES = 10;
+ final int MAX_ACCEPTABLE_TIME = 7500; // saw 5601ms on buildhive
+
+ long tWarmup = execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A"));
+
+ List<Long> times = Lists.newArrayList();
+ for (int i = 1; i <= NUM_TIMES; i++) {
+ times.add(execTasksAndWaitForDone(NUM_TASKS, ImmutableList.of("A")));
+ }
+
+ Long toobig = Iterables.find(
+ times,
+ new Predicate<Long>() {
+ public boolean apply(Long input) {
+ return input > MAX_ACCEPTABLE_TIME;
+ }},
+ null);
+ assertNull(toobig, "warmup="+tWarmup+"; times="+times);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private long execTasksAndWaitForDone(int numTasks, List<?> tags) throws Exception {
+ List<Task<?>> tasks = Lists.newArrayList();
+ long startTimestamp = System.currentTimeMillis();
+ for (int i = 1; i < numTasks; i++) {
+ Task<?> t = new BasicTask(Callables.returning(null)); // no-op
+ em.submit(MutableMap.of("tags", tags), t);
+ tasks.add(t);
+ }
+ long submittedTimestamp = System.currentTimeMillis();
+
+ for (Task t : tasks) {
+ t.get();
+ }
+ long endTimestamp = System.currentTimeMillis();
+ long submitTime = submittedTimestamp - startTimestamp;
+ long totalTime = endTimestamp - startTimestamp;
+
+ log.info("Executed {} tasks; {}ms total; {}ms to submit", new Object[] {numTasks, totalTime, submitTime});
+
+ return totalTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
new file mode 100644
index 0000000..c730738
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTaskExecutionTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Callables;
+
+/**
+ * Test the operation of the {@link BasicTask} class.
+ *
+ * TODO clarify test purpose
+ */
+public class BasicTaskExecutionTest {
+ private static final Logger log = LoggerFactory.getLogger(BasicTaskExecutionTest.class);
+
+ private static final int TIMEOUT_MS = 10*1000;
+
+ private BasicExecutionManager em;
+ private Map<Object, Object> data;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() {
+ em = new BasicExecutionManager("mycontext");
+ data = Collections.synchronizedMap(new HashMap<Object, Object>());
+ data.clear();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (em != null) em.shutdownNow();
+ if (data != null) data.clear();
+ }
+
+ @Test
+ public void runSimpleBasicTask() throws Exception {
+ BasicTask<Object> t = new BasicTask<Object>(newPutCallable(1, "b"));
+ data.put(1, "a");
+ Task<Object> t2 = em.submit(MutableMap.of("tag", "A"), t);
+ assertEquals("a", t.get());
+ assertEquals("a", t2.get());
+ assertEquals("b", data.get(1));
+ }
+
+ @Test
+ public void runSimpleRunnable() throws Exception {
+ data.put(1, "a");
+ Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutRunnable(1, "b"));
+ assertEquals(null, t.get());
+ assertEquals("b", data.get(1));
+ }
+
+ @Test
+ public void runSimpleCallable() throws Exception {
+ data.put(1, "a");
+ Task<?> t = em.submit(MutableMap.of("tag", "A"), newPutCallable(1, "b"));
+ assertEquals("a", t.get());
+ assertEquals("b", data.get(1));
+ }
+
+ @Test
+ public void runBasicTaskWithWaits() throws Exception {
+ final CountDownLatch signalStarted = new CountDownLatch(1);
+ final CountDownLatch allowCompletion = new CountDownLatch(1);
+ final BasicTask<Object> t = new BasicTask<Object>(new Callable<Object>() {
+ public Object call() throws Exception {
+ Object result = data.put(1, "b");
+ signalStarted.countDown();
+ assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ return result;
+ }});
+ data.put(1, "a");
+
+ Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t);
+ assertEquals(t, t2);
+ assertFalse(t.isDone());
+
+ assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ assertEquals("b", data.get(1));
+ assertFalse(t.isDone());
+
+ log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false));
+
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ String status = t.getStatusDetail(false);
+ assertTrue(status != null && status.toLowerCase().contains("waiting"), "status="+status);
+ }});
+
+ allowCompletion.countDown();
+ assertEquals("a", t.get());
+ }
+
+ @Test
+ public void runMultipleBasicTasks() throws Exception {
+ data.put(1, 1);
+ BasicExecutionManager em = new BasicExecutionManager("mycontext");
+ for (int i = 0; i < 2; i++) {
+ em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1)));
+ em.submit(MutableMap.of("tag", "B"), new BasicTask<Integer>(newIncrementCallable((1))));
+ }
+ int total = 0;
+ for (Object tag : em.getTaskTags()) {
+ log.debug("tag {}", tag);
+ for (Task<?> task : em.getTasksWithTag(tag)) {
+ log.debug("BasicTask {}, has {}", task, task.get());
+ total += (Integer)task.get();
+ }
+ }
+ assertEquals(10, total);
+ //now that all have completed:
+ assertEquals(5, data.get(1));
+ }
+
+ @Test
+ public void runMultipleBasicTasksMultipleTags() throws Exception {
+ data.put(1, 1);
+ Collection<Task<Integer>> tasks = Lists.newArrayList();
+ tasks.add(em.submit(MutableMap.of("tag", "A"), new BasicTask<Integer>(newIncrementCallable(1))));
+ tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("A","B")), new BasicTask<Integer>(newIncrementCallable(1))));
+ tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("B","C")), new BasicTask<Integer>(newIncrementCallable(1))));
+ tasks.add(em.submit(MutableMap.of("tags", ImmutableList.of("D")), new BasicTask<Integer>(newIncrementCallable(1))));
+ int total = 0;
+
+ for (Task<Integer> t : tasks) {
+ log.debug("BasicTask {}, has {}", t, t.get());
+ total += t.get();
+ }
+ assertEquals(10, total);
+
+ //now that all have completed:
+ assertEquals(data.get(1), 5);
+ assertEquals(em.getTasksWithTag("A").size(), 2);
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")).size(), 2);
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")).size(), 2);
+
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")).size(), 3);
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")).size(), 1);
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("B", "C")).size(), 1);
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "D")).size(), 3);
+ }
+
+ @Test
+ public void testGetTaskById() throws Exception {
+ Task<?> t = new BasicTask<Void>(newNoop());
+ em.submit(MutableMap.of("tag", "A"), t);
+ assertEquals(em.getTask(t.getId()), t);
+ }
+
+ @Test
+ public void testRetrievingTasksWithTagsReturnsExpectedTask() throws Exception {
+ Task<?> t = new BasicTask<Void>(newNoop());
+ em.submit(MutableMap.of("tag", "A"), t);
+ t.get();
+
+ assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t));
+ }
+
+ @Test
+ public void testRetrievingTasksWithTagsExcludesNonMatchingTasks() throws Exception {
+ Task<?> t = new BasicTask<Void>(newNoop());
+ em.submit(MutableMap.of("tag", "A"), t);
+ t.get();
+
+ assertEquals(em.getTasksWithTag("B"), ImmutableSet.of());
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableSet.of());
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableSet.of());
+ }
+
+ @Test
+ public void testRetrievingTasksWithMultipleTags() throws Exception {
+ Task<?> t = new BasicTask<Void>(newNoop());
+ em.submit(MutableMap.of("tags", ImmutableList.of("A", "B")), t);
+ t.get();
+
+ assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+ assertEquals(em.getTasksWithTag("B"), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("B")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAnyTag(ImmutableList.of("A", "B")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A", "B")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("A")), ImmutableList.of(t));
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("B")), ImmutableList.of(t));
+ }
+
+ // ENGR-1796: if nothing matched first tag, then returned whatever matched second tag!
+ @Test
+ public void testRetrievingTasksWithAllTagsWhenFirstNotMatched() throws Exception {
+ Task<?> t = new BasicTask<Void>(newNoop());
+ em.submit(MutableMap.of("tags", ImmutableList.of("A")), t);
+ t.get();
+
+ assertEquals(em.getTasksWithAllTags(ImmutableList.of("not_there","A")), ImmutableSet.of());
+ }
+
+ @Test
+ public void testRetrievedTasksIncludesTasksInProgress() throws Exception {
+ final CountDownLatch runningLatch = new CountDownLatch(1);
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+ Task<Void> t = new BasicTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ runningLatch.countDown();
+ finishLatch.await();
+ return null;
+ }});
+ em.submit(MutableMap.of("tags", ImmutableList.of("A")), t);
+
+ try {
+ runningLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(em.getTasksWithTag("A"), ImmutableList.of(t));
+ } finally {
+ finishLatch.countDown();
+ }
+ }
+
+ @Test
+ public void cancelBeforeRun() throws Exception {
+ final CountDownLatch blockForever = new CountDownLatch(1);
+
+ BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ blockForever.await(); return 42;
+ }});
+ t.cancel(true);
+ assertTrue(t.isCancelled());
+ assertTrue(t.isDone());
+ assertTrue(t.isError());
+ em.submit(MutableMap.of("tag", "A"), t);
+ try {
+ t.get();
+ fail("get should have failed due to cancel");
+ } catch (CancellationException e) {
+ // expected
+ }
+ assertTrue(t.isCancelled());
+ assertTrue(t.isDone());
+ assertTrue(t.isError());
+
+ log.debug("cancelBeforeRun status: {}", t.getStatusDetail(false));
+ assertTrue(t.getStatusDetail(false).toLowerCase().contains("cancel"));
+ }
+
+ @Test
+ public void cancelDuringRun() throws Exception {
+ final CountDownLatch signalStarted = new CountDownLatch(1);
+ final CountDownLatch blockForever = new CountDownLatch(1);
+
+ BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ synchronized (data) {
+ signalStarted.countDown();
+ blockForever.await();
+ }
+ return 42;
+ }});
+ em.submit(MutableMap.of("tag", "A"), t);
+ assertFalse(t.isCancelled());
+ assertFalse(t.isDone());
+ assertFalse(t.isError());
+
+ assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ t.cancel(true);
+
+ assertTrue(t.isCancelled());
+ assertTrue(t.isError());
+ try {
+ t.get();
+ fail("get should have failed due to cancel");
+ } catch (CancellationException e) {
+ // expected
+ }
+ assertTrue(t.isCancelled());
+ assertTrue(t.isDone());
+ assertTrue(t.isError());
+ }
+
+ @Test
+ public void cancelAfterRun() throws Exception {
+ BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(42));
+ em.submit(MutableMap.of("tag", "A"), t);
+
+ assertEquals(t.get(), (Integer)42);
+ t.cancel(true);
+ assertFalse(t.isCancelled());
+ assertFalse(t.isError());
+ assertTrue(t.isDone());
+ }
+
+ @Test
+ public void errorDuringRun() throws Exception {
+ BasicTask<Void> t = new BasicTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ throw new IllegalStateException("Simulating failure in errorDuringRun");
+ }});
+
+ em.submit(MutableMap.of("tag", "A"), t);
+
+ try {
+ t.get();
+ fail("get should have failed due to error");
+ } catch (Exception eo) {
+ Throwable e = Throwables.getRootCause(eo);
+ assertEquals("Simulating failure in errorDuringRun", e.getMessage());
+ }
+
+ assertFalse(t.isCancelled());
+ assertTrue(t.isError());
+ assertTrue(t.isDone());
+
+ log.debug("errorDuringRun status: {}", t.getStatusDetail(false));
+ assertTrue(t.getStatusDetail(false).contains("Simulating failure in errorDuringRun"), "details="+t.getStatusDetail(false));
+ }
+
+ @Test
+ public void fieldsSetForSimpleBasicTask() throws Exception {
+ final CountDownLatch signalStarted = new CountDownLatch(1);
+ final CountDownLatch allowCompletion = new CountDownLatch(1);
+
+ BasicTask<Integer> t = new BasicTask<Integer>(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ signalStarted.countDown();
+ allowCompletion.await();
+ return 42;
+ }});
+ assertEquals(null, t.getSubmittedByTask());
+ assertEquals(-1, t.submitTimeUtc);
+ assertNull(t.getInternalFuture());
+
+ em.submit(MutableMap.of("tag", "A"), t);
+ assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+
+ assertTrue(t.submitTimeUtc > 0);
+ assertTrue(t.startTimeUtc >= t.submitTimeUtc);
+ assertNotNull(t.getInternalFuture());
+ assertEquals(-1, t.endTimeUtc);
+ assertEquals(false, t.isCancelled());
+
+ allowCompletion.countDown();
+ assertEquals(t.get(), (Integer)42);
+ assertTrue(t.endTimeUtc >= t.startTimeUtc);
+
+ log.debug("BasicTask duration (millis): {}", (t.endTimeUtc - t.submitTimeUtc));
+ }
+
+ @Test
+ public void fieldsSetForBasicTaskSubmittedBasicTask() throws Exception {
+ //submitted BasicTask B is started by A, and waits for A to complete
+ BasicTask<Integer> t = new BasicTask<Integer>(MutableMap.of("displayName", "sample", "description", "some descr"), new Callable<Integer>() {
+ public Integer call() throws Exception {
+ em.submit(MutableMap.of("tag", "B"), new Callable<Integer>() {
+ public Integer call() throws Exception {
+ assertEquals(45, em.getTasksWithTag("A").iterator().next().get());
+ return 46;
+ }});
+ return 45;
+ }});
+ em.submit(MutableMap.of("tag", "A"), t);
+
+ t.blockUntilEnded();
+
+// assertEquals(em.getAllTasks().size(), 2
+
+ BasicTask<?> tb = (BasicTask<?>) em.getTasksWithTag("B").iterator().next();
+ assertEquals( 46, tb.get() );
+ assertEquals( t, em.getTasksWithTag("A").iterator().next() );
+ assertNull( t.getSubmittedByTask() );
+
+ BasicTask<?> submitter = (BasicTask<?>) tb.getSubmittedByTask();
+ assertNotNull(submitter);
+ assertEquals("sample", submitter.displayName);
+ assertEquals("some descr", submitter.description);
+ assertEquals(t, submitter);
+
+ assertTrue(submitter.submitTimeUtc <= tb.submitTimeUtc);
+ assertTrue(submitter.endTimeUtc <= tb.endTimeUtc);
+
+ log.debug("BasicTask {} was submitted by {}", tb, submitter);
+ }
+
+ private Callable<Object> newPutCallable(final Object key, final Object val) {
+ return new Callable<Object>() {
+ public Object call() {
+ return data.put(key, val);
+ }
+ };
+ }
+
+ private Callable<Integer> newIncrementCallable(final Object key) {
+ return new Callable<Integer>() {
+ public Integer call() {
+ synchronized (data) {
+ return (Integer) data.put(key, (Integer)data.get(key) + 1);
+ }
+ }
+ };
+ }
+
+ private Runnable newPutRunnable(final Object key, final Object val) {
+ return new Runnable() {
+ public void run() {
+ data.put(key, val);
+ }
+ };
+ }
+
+ private Runnable newNoop() {
+ return new Runnable() {
+ public void run() {
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
new file mode 100644
index 0000000..020a98c
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/BasicTasksFutureTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+
+public class BasicTasksFutureTest {
+
+ private static final Logger log = LoggerFactory.getLogger(BasicTasksFutureTest.class);
+
+ private BasicExecutionManager em;
+ private BasicExecutionContext ec;
+ private Map<Object,Object> data;
+ private ExecutorService ex;
+ private Semaphore started;
+ private Semaphore waitInTask;
+ private Semaphore cancelledWhileSleeping;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() {
+ em = new BasicExecutionManager("mycontext");
+ ec = new BasicExecutionContext(em);
+ ex = Executors.newCachedThreadPool();
+ data = Collections.synchronizedMap(new LinkedHashMap<Object,Object>());
+ started = new Semaphore(0);
+ waitInTask = new Semaphore(0);
+ cancelledWhileSleeping = new Semaphore(0);
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (em != null) em.shutdownNow();
+ if (ex != null) ex.shutdownNow();
+ }
+
+ @Test
+ public void testBlockAndGetWithTimeoutsAndListenableFuture() throws InterruptedException {
+ Task<String> t = waitForSemaphore(Duration.FIVE_SECONDS, true, "x");
+
+ Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
+ Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
+ boolean didNotThrow = false;
+
+ try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
+ catch (Exception e) { /* expected */ }
+ Assert.assertFalse(didNotThrow);
+
+ try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
+ catch (Exception e) { /* expected */ }
+ Assert.assertFalse(didNotThrow);
+
+ addFutureListener(t, "before");
+ ec.submit(t);
+
+ Assert.assertFalse(t.blockUntilEnded(Duration.millis(1)));
+ Assert.assertFalse(t.blockUntilEnded(Duration.ZERO));
+
+ try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; }
+ catch (Exception e) { /* expected */ }
+ Assert.assertFalse(didNotThrow);
+
+ try { t.getUnchecked(Duration.ZERO); didNotThrow = true; }
+ catch (Exception e) { /* expected */ }
+ Assert.assertFalse(didNotThrow);
+
+ addFutureListener(t, "during");
+
+ synchronized (data) {
+ // now let it finish
+ waitInTask.release();
+ Assert.assertTrue(t.blockUntilEnded(Duration.TEN_SECONDS));
+
+ Assert.assertEquals(t.getUnchecked(Duration.millis(1)), "x");
+ Assert.assertEquals(t.getUnchecked(Duration.ZERO), "x");
+
+ Assert.assertNull(data.get("before"));
+ Assert.assertNull(data.get("during"));
+ // can't set the data(above) until we release the lock (in assert call below)
+ assertSoonGetsData("before");
+ assertSoonGetsData("during");
+ }
+
+ // and see that a listener added late also runs
+ synchronized (data) {
+ addFutureListener(t, "after");
+ Assert.assertNull(data.get("after"));
+ assertSoonGetsData("after");
+ }
+ }
+
+ private void addFutureListener(Task<String> t, final String key) {
+ t.addListener(new Runnable() { public void run() {
+ synchronized (data) {
+ log.info("notifying for "+key);
+ data.notifyAll();
+ data.put(key, true);
+ }
+ }}, ex);
+ }
+
+ private void assertSoonGetsData(String key) throws InterruptedException {
+ for (int i=0; i<10; i++) {
+ if (Boolean.TRUE.equals(data.get(key))) {
+ log.info("got data for "+key);
+ return;
+ }
+ data.wait(Duration.ONE_SECOND.toMilliseconds());
+ }
+ Assert.fail("did not get data for '"+key+"' in time");
+ }
+
+ private <T> Task<T> waitForSemaphore(final Duration time, final boolean requireSemaphore, final T result) {
+ return Tasks.<T>builder().body(new Callable<T>() {
+ public T call() {
+ try {
+ started.release();
+ log.info("waiting up to "+time+" to acquire before returning "+result);
+ if (!waitInTask.tryAcquire(time.toMilliseconds(), TimeUnit.MILLISECONDS)) {
+ log.info("did not get semaphore");
+ if (requireSemaphore) Assert.fail("task did not get semaphore");
+ } else {
+ log.info("got semaphore");
+ }
+ } catch (Exception e) {
+ log.info("cancelled before returning "+result);
+ cancelledWhileSleeping.release();
+ throw Exceptions.propagate(e);
+ }
+ log.info("task returning "+result);
+ return result;
+ }
+ }).build();
+ }
+
+ @Test
+ public void testCancelAfterStartTriggersListenableFuture() throws Exception {
+ doTestCancelTriggersListenableFuture(Duration.millis(50));
+ }
+ @Test
+ public void testCancelImmediateTriggersListenableFuture() throws Exception {
+ // if cancel fires after submit but before it passes to the executor,
+ // that needs handling separately; this doesn't guarantee this code path,
+ // but it happens sometimes (and it should be handled)
+ doTestCancelTriggersListenableFuture(Duration.ZERO);
+ }
+ public void doTestCancelTriggersListenableFuture(Duration delay) throws Exception {
+ Task<String> t = waitForSemaphore(Duration.TEN_SECONDS, true, "x");
+ addFutureListener(t, "before");
+
+ Stopwatch watch = Stopwatch.createStarted();
+ ec.submit(t);
+
+ addFutureListener(t, "during");
+
+ log.info("test cancelling "+t+" ("+t.getClass()+") after "+delay);
+ // NB: two different code paths (callers to this method) for notifying futures
+ // depending whether task is started
+ Time.sleep(delay);
+
+ synchronized (data) {
+ t.cancel(true);
+
+ assertSoonGetsData("before");
+ assertSoonGetsData("during");
+
+ addFutureListener(t, "after");
+ Assert.assertNull(data.get("after"));
+ assertSoonGetsData("after");
+ }
+
+ Assert.assertTrue(t.isDone());
+ Assert.assertTrue(t.isCancelled());
+ try {
+ t.get();
+ Assert.fail("should have thrown CancellationException");
+ } catch (CancellationException e) { /* expected */ }
+
+ Assert.assertTrue(watch.elapsed(TimeUnit.MILLISECONDS) < Duration.FIVE_SECONDS.toMilliseconds(),
+ Time.makeTimeStringRounded(watch.elapsed(TimeUnit.MILLISECONDS))+" is too long; should have cancelled very quickly");
+
+ if (started.tryAcquire())
+ // if the task is begun, this should get released
+ Assert.assertTrue(cancelledWhileSleeping.tryAcquire(5, TimeUnit.SECONDS));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
new file mode 100644
index 0000000..89bde95
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/util/task/CompoundTaskExecutionTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Semaphore;
+
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.BasicExecutionContext;
+import org.apache.brooklyn.core.util.task.BasicExecutionManager;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.CompoundTask;
+import org.apache.brooklyn.core.util.task.ParallelTask;
+import org.apache.brooklyn.core.util.task.SequentialTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Test the operation of the {@link CompoundTask} class.
+ */
+public class CompoundTaskExecutionTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompoundTaskExecutionTest.class);
+
+ BasicExecutionManager em;
+ BasicExecutionContext ec;
+
+ @BeforeClass
+ public void setup() {
+ em = new BasicExecutionManager("mycontext");
+ ec = new BasicExecutionContext(em);
+ }
+
+ @AfterClass
+ public void teardown() {
+ if (em != null) em.shutdownNow();
+ em = null;
+ }
+
+ private BasicTask<String> taskReturning(final String val) {
+ return new BasicTask<String>(new Callable<String>() {
+ @Override public String call() {
+ return val;
+ }
+ });
+ }
+
+ private BasicTask<String> slowTaskReturning(final String val, final Duration pauseTime) {
+ return new BasicTask<String>(new Callable<String>() {
+ @Override public String call() {
+ Time.sleep(pauseTime);
+ return val;
+ }
+ });
+ }
+
+
+ @Test
+ public void runSequenceTask() throws Exception {
+ BasicTask<String> t1 = taskReturning("a");
+ BasicTask<String> t2 = taskReturning("b");
+ BasicTask<String> t3 = taskReturning("c");
+ BasicTask<String> t4 = taskReturning("d");
+ Task<List<String>> tSequence = ec.submit(new SequentialTask<String>(t1, t2, t3, t4));
+ assertEquals(tSequence.get(), ImmutableList.of("a", "b", "c", "d"));
+ }
+
+ @Test
+ public void testSequentialTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
+ BasicTask<String> t1 = taskReturning("a");
+ BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
+ @Override public String call() throws Exception {
+ throw new IllegalArgumentException("forced exception");
+ }
+ });
+ BasicTask<String> t3 = taskReturning("c");
+ SequentialTask<String> task = new SequentialTask<String>(t1, t2, t3);
+ Task<List<String>> tSequence = ec.submit(task);
+
+ try {
+ tSequence.get();
+ fail("t2 should have thrown an exception");
+ } catch (Exception e) {}
+
+ assertTrue(task.isDone());
+ assertTrue(task.isError());
+ assertTrue(t1.isDone());
+ assertFalse(t1.isError());
+ assertTrue(t2.isDone());
+ assertTrue(t2.isError());
+ // t3 not run because of t2 exception
+ assertFalse(t3.isDone());
+ assertFalse(t3.isBegun());
+ }
+
+ @Test
+ public void testParallelTaskFailsWhenIntermediateTaskThrowsException() throws Exception {
+ // differs from test above of SequentialTask in that expect t3 to be executed,
+ // despite t2 failing.
+ // TODO Do we expect tSequence.get() to block for everything to either fail or complete,
+ // and then to throw exception? Currently it does *not* do that so test was previously failing.
+
+ BasicTask<String> t1 = taskReturning("a");
+ BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() {
+ @Override public String call() throws Exception {
+ throw new IllegalArgumentException("forced exception");
+ }
+ });
+ BasicTask<String> t3 = slowTaskReturning("c", Duration.millis(100));
+ ParallelTask<String> task = new ParallelTask<String>(t1, t2, t3);
+ Task<List<String>> tSequence = ec.submit(task);
+
+ try {
+ tSequence.get();
+ fail("t2 should have thrown an exception");
+ } catch (Exception e) {}
+
+ assertTrue(task.isDone());
+ assertTrue(task.isError());
+ assertTrue(t1.isDone());
+ assertFalse(t1.isError());
+ assertTrue(t2.isDone());
+ assertTrue(t2.isError());
+ assertTrue(t3.isBegun());
+ assertTrue(t3.isDone());
+ assertFalse(t3.isError());
+ }
+
+ @Test
+ public void runParallelTask() throws Exception {
+ BasicTask<String> t1 = taskReturning("a");
+ BasicTask<String> t2 = taskReturning("b");
+ BasicTask<String> t3 = taskReturning("c");
+ BasicTask<String> t4 = taskReturning("d");
+ Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
+ assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
+ }
+
+ @Test
+ public void runParallelTaskWithDelay() throws Exception {
+ final Semaphore locker = new Semaphore(0);
+ BasicTask<String> t1 = new BasicTask<String>(new Callable<String>() {
+ @Override public String call() {
+ try {
+ locker.acquire();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ return "a";
+ }
+ });
+ BasicTask<String> t2 = taskReturning("b");
+ BasicTask<String> t3 = taskReturning("c");
+ BasicTask<String> t4 = taskReturning("d");
+ final Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3));
+
+ assertEquals(ImmutableSet.of(t2.get(), t3.get(), t4.get()), ImmutableSet.of("b", "c", "d"));
+ assertFalse(t1.isDone());
+ assertFalse(tSequence.isDone());
+
+ // get blocks until tasks have completed
+ Thread t = new Thread() {
+ @Override public void run() {
+ try {
+ tSequence.get();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ locker.release();
+ }
+ };
+ t.start();
+ Thread.sleep(30);
+ assertTrue(t.isAlive());
+
+ locker.release();
+
+ assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d"));
+ assertTrue(t1.isDone());
+ assertTrue(tSequence.isDone());
+
+ locker.acquire();
+ }
+
+ @Test
+ public void testComplexOrdering() throws Exception {
+ List<String> data = new CopyOnWriteArrayList<String>();
+ SequentialTask<String> taskA = new SequentialTask<String>(
+ appendAfterDelay(data, "a1"), appendAfterDelay(data, "a2"), appendAfterDelay(data, "a3"), appendAfterDelay(data, "a4"));
+ SequentialTask<String> taskB = new SequentialTask<String>(
+ appendAfterDelay(data, "b1"), appendAfterDelay(data, "b2"), appendAfterDelay(data, "b3"), appendAfterDelay(data, "b4"));
+ Task<List<String>> t = ec.submit(new ParallelTask<String>(taskA, taskB));
+ t.get();
+
+ LOG.debug("Tasks happened in order: {}", data);
+ assertEquals(data.size(), 8);
+ assertEquals(new HashSet<String>(data), ImmutableSet.of("a1", "a2", "a3", "a4", "b1", "b2", "b3", "b4"));
+
+ // a1, ..., a4 should be in order
+ List<String> as = Lists.newArrayList(), bs = Lists.newArrayList();
+ for (String value : data) {
+ ((value.charAt(0) == 'a') ? as : bs).add(value);
+ }
+ assertEquals(as, ImmutableList.of("a1", "a2", "a3", "a4"));
+ assertEquals(bs, ImmutableList.of("b1", "b2", "b3", "b4"));
+ }
+
+ private BasicTask<String> appendAfterDelay(final List<String> list, final String value) {
+ return new BasicTask<String>(new Callable<String>() {
+ @Override public String call() {
+ try {
+ Thread.sleep((int) (100 * Math.random()));
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ LOG.debug("running {}", value);
+ list.add(value);
+ return value;
+ }
+ });
+ }
+
+}