You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/01 12:53:51 UTC
[2/4] git commit: Covert perf tests from groovy to java
Covert perf tests from groovy to java
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/b1e27d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/b1e27d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/b1e27d8b
Branch: refs/heads/master
Commit: b1e27d8b1cd9d0745aa7a4c04c51c56de37a9297
Parents: 870a1b8
Author: Aled Sage <al...@gmail.com>
Authored: Tue Jun 24 08:55:56 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Jul 1 11:06:33 2014 +0100
----------------------------------------------------------------------
.../EntityPerformanceLongevityTest.groovy | 23 ---
.../EntityPerformanceLongevityTest.java | 17 ++
.../qa/performance/EntityPerformanceTest.java | 17 --
.../FilePersistencePerformanceTest.java | 4 -
.../JavaYardStickPerformanceTest.java | 4 -
.../SubscriptionPerformanceTest.groovy | 127 --------------
.../SubscriptionPerformanceTest.java | 150 +++++++++++++++++
.../qa/performance/TaskPerformanceTest.groovy | 121 --------------
.../qa/performance/TaskPerformanceTest.java | 164 +++++++++++++++++++
9 files changed, 331 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.groovy
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.groovy b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.groovy
deleted file mode 100644
index 08a009b..0000000
--- a/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.groovy
+++ /dev/null
@@ -1,23 +0,0 @@
-package brooklyn.qa.performance
-
-import static brooklyn.test.TestUtils.*
-import static org.testng.Assert.*
-
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-import brooklyn.test.entity.TestEntity
-
-
-public class EntityPerformanceLongevityTest extends EntityPerformanceTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityPerformanceLongevityTest.class)
-
- // TODO enable this to some big number to see what happens when things run for a long time.
- // e.g. will we eventually get OOME when storing all tasks relating to effector calls?
-
-// protected int numIterations() {
-// return 1000000
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.java b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.java
new file mode 100644
index 0000000..5bb1279
--- /dev/null
+++ b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceLongevityTest.java
@@ -0,0 +1,17 @@
+package brooklyn.qa.performance;
+
+import org.testng.annotations.Test;
+
+
+@Test(groups={"Integration", "Acceptance"})
+public class EntityPerformanceLongevityTest extends EntityPerformanceTest {
+
+ // TODO enable this to some big number to see what happens when things run for a long time.
+ // e.g. will we eventually get OOME when storing all tasks relating to effector calls?
+
+// @Override
+// protected int numIterations() {
+// return 1000000;
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/EntityPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceTest.java
index 74ab04b..6a3759a 100644
--- a/core/src/test/java/brooklyn/qa/performance/EntityPerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/EntityPerformanceTest.java
@@ -5,8 +5,6 @@ import static org.testng.Assert.assertTrue;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -25,8 +23,6 @@ import com.google.common.collect.Lists;
public class EntityPerformanceTest extends AbstractPerformanceTest {
- private static final Logger LOG = LoggerFactory.getLogger(EntityPerformanceTest.class);
-
private static final long TIMEOUT_MS = 10*1000;
TestEntity entity;
@@ -51,19 +47,6 @@ public class EntityPerformanceTest extends AbstractPerformanceTest {
}
@Test(groups={"Integration", "Acceptance"})
- public void testGroovyNoopToEnsureTestFrameworkIsVeryFast() {
- int numIterations = numIterations();
- double minRatePerSec = 100000 * PERFORMANCE_EXPECTATION;
- final AtomicInteger i = new AtomicInteger();
-
- measureAndAssert("noop-groovy", numIterations, minRatePerSec, new Runnable() {
- public void run() {
- i.incrementAndGet();
- }});
- assertTrue(i.get() >= numIterations, "i="+i);
- }
-
- @Test(groups={"Integration", "Acceptance"})
public void testUpdateAttributeWhenNoListeners() {
int numIterations = numIterations();
double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
index eaf81ae..65efdb1 100644
--- a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
@@ -5,8 +5,6 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -23,8 +21,6 @@ import com.google.common.io.Files;
public class FilePersistencePerformanceTest extends AbstractPerformanceTest {
- private static final Logger LOG = LoggerFactory.getLogger(FilePersistencePerformanceTest.class);
-
File file;
FileBasedStoreObjectAccessor fileAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/JavaYardStickPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/JavaYardStickPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/JavaYardStickPerformanceTest.java
index 5954b8a..02f7073 100644
--- a/core/src/test/java/brooklyn/qa/performance/JavaYardStickPerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/JavaYardStickPerformanceTest.java
@@ -6,8 +6,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -16,8 +14,6 @@ import com.google.common.base.Throwables;
public class JavaYardStickPerformanceTest extends AbstractPerformanceTest {
- private static final Logger LOG = LoggerFactory.getLogger(JavaYardStickPerformanceTest.class);
-
protected static final long TIMEOUT_MS = 10*1000;
private ExecutorService executor;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.groovy
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.groovy b/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.groovy
deleted file mode 100644
index 5cb2a07..0000000
--- a/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.groovy
+++ /dev/null
@@ -1,127 +0,0 @@
-package brooklyn.qa.performance
-
-import static brooklyn.test.TestUtils.*
-import static org.testng.Assert.*
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.testng.annotations.BeforeMethod
-import org.testng.annotations.Test
-
-import brooklyn.entity.proxying.EntitySpec
-import brooklyn.event.SensorEventListener
-import brooklyn.location.basic.SimulatedLocation
-import brooklyn.management.SubscriptionManager
-import brooklyn.test.entity.TestApplication
-import brooklyn.test.entity.TestEntity
-
-public class SubscriptionPerformanceTest extends AbstractPerformanceTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(SubscriptionPerformanceTest.class)
-
- private static final long LONG_TIMEOUT_MS = 30*1000
- private static final int NUM_ITERATIONS = 10000
-
- TestEntity entity
- List<TestEntity> entities
- SubscriptionManager subscriptionManager
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() {
- super.setUp()
-
- entities = []
- for (int i = 0; i < 10; i++) {
- entities += app.createAndManageChild(EntitySpec.create(TestEntity.class));
- }
- entity = entities[0]
- app.start([loc])
-
- subscriptionManager = app.managementContext.subscriptionManager
- }
-
- @Test(groups=["Integration", "Acceptance"])
- public void testManyPublishedOneSubscriber() {
- int numSubscribers = 1
- int numIterations = NUM_ITERATIONS
- double minRatePerSec = 100 * PERFORMANCE_EXPECTATION; // i.e. 100*10 events delivered per sec
- int iter = 0
- int expectedCount = numIterations*numSubscribers
-
- AtomicInteger listenerCount = new AtomicInteger()
- CountDownLatch completionLatch = new CountDownLatch(1)
-
- for (int i = 0; i < numSubscribers; i++) {
- subscriptionManager.subscribe([subscriber:i], entity, TestEntity.SEQUENCE,
- {
- int count = listenerCount.incrementAndGet()
- if (count >= expectedCount) completionLatch.countDown()
- } as SensorEventListener)
- }
-
- measureAndAssert("updateAttributeWithManyPublishedOneSubscriber", numIterations, minRatePerSec,
- { entity.setAttribute(TestEntity.SEQUENCE, (iter++)) },
- { completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS); assertTrue(completionLatch.getCount() <= 0) })
- }
-
- // TODO but surely parallel should be much faster?!
- @Test(groups=["Integration", "Acceptance"])
- public void testManyListenersForSensorEvent() {
- int numSubscribers = 10
- int numIterations = NUM_ITERATIONS
- double minRatePerSec = 100 * PERFORMANCE_EXPECTATION; // i.e. 100*10 events delivered per sec
- int iter = 0
- int expectedCount = numIterations*numSubscribers
-
- AtomicInteger listenerCount = new AtomicInteger()
- CountDownLatch completionLatch = new CountDownLatch(1)
-
- for (int i = 0; i < numSubscribers; i++) {
- subscriptionManager.subscribe([subscriber:i], entity, TestEntity.SEQUENCE,
- {
- int count = listenerCount.incrementAndGet()
- if (count >= expectedCount) completionLatch.countDown()
- } as SensorEventListener)
- }
-
- measureAndAssert("updateAttributeWithManyListeners", numIterations, minRatePerSec,
- { entity.setAttribute(TestEntity.SEQUENCE, (iter++)) },
- { completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS); assertTrue(completionLatch.getCount() <= 0) })
- }
-
- @Test(groups=["Integration", "Acceptance"])
- public void testUpdateAttributeWithNoListenersButManyUnrelatedListeners() {
- int numUnrelatedSubscribers = 1000
- int numIterations = NUM_ITERATIONS
- double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
- int iter = 0
- int lastVal = 0
- Exception exception
-
- for (int i = 0; i < (numUnrelatedSubscribers/2); i++) {
- subscriptionManager.subscribe([subscriber:i], entities[1], TestEntity.SEQUENCE,
- {
- exception = new RuntimeException("Unrelated subscriber called with $it")
- throw exception
- } as SensorEventListener)
- subscriptionManager.subscribe([subscriber:i], entity, TestEntity.MY_NOTIF,
- {
- exception = new RuntimeException("Unrelated subscriber called with $it")
- throw exception
- } as SensorEventListener)
- }
-
- measureAndAssert("updateAttributeWithUnrelatedListeners", numIterations, minRatePerSec) {
- entity.setAttribute(TestEntity.SEQUENCE, (++iter))
- }
-
- if (exception != null) {
- throw exception
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.java
new file mode 100644
index 0000000..4b7752f
--- /dev/null
+++ b/core/src/test/java/brooklyn/qa/performance/SubscriptionPerformanceTest.java
@@ -0,0 +1,150 @@
+package brooklyn.qa.performance;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.management.SubscriptionManager;
+import brooklyn.test.entity.TestEntity;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class SubscriptionPerformanceTest extends AbstractPerformanceTest {
+
+ private static final long LONG_TIMEOUT_MS = 30*1000;
+ private static final int NUM_ITERATIONS = 10000;
+
+ TestEntity entity;
+ List<TestEntity> entities;
+ SubscriptionManager subscriptionManager;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+
+ entities = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ entities.add(app.createAndManageChild(EntitySpec.create(TestEntity.class)));
+ }
+ entity = entities.get(0);
+ app.start(ImmutableList.of(loc));
+
+ subscriptionManager = app.getManagementContext().getSubscriptionManager();
+ }
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testManyPublishedOneSubscriber() throws Exception {
+ int numSubscribers = 1;
+ int numIterations = NUM_ITERATIONS;
+ double minRatePerSec = 100 * PERFORMANCE_EXPECTATION; // i.e. 100*10 events delivered per sec
+ final AtomicInteger iter = new AtomicInteger();
+ final int expectedCount = numIterations*numSubscribers;
+
+ final AtomicInteger listenerCount = new AtomicInteger();
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ for (int i = 0; i < numSubscribers; i++) {
+ subscriptionManager.subscribe(MutableMap.<String, Object>of("subscriber", i), entity, TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
+ public void onEvent(SensorEvent<Integer> event) {
+ int count = listenerCount.incrementAndGet();
+ if (count >= expectedCount) completionLatch.countDown();
+ }});
+ }
+
+ measureAndAssert("updateAttributeWithManyPublishedOneSubscriber", numIterations, minRatePerSec,
+ new Runnable() {
+ public void run() {
+ entity.setAttribute(TestEntity.SEQUENCE, (iter.getAndIncrement()));
+ }
+ },
+ new Runnable() {
+ public void run() {
+ try {
+ completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ assertTrue(completionLatch.getCount() <= 0);
+ }
+ });
+ }
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testManyListenersForSensorEvent() throws Exception {
+ int numSubscribers = 10;
+ int numIterations = NUM_ITERATIONS;
+ double minRatePerSec = 100 * PERFORMANCE_EXPECTATION; // i.e. 100*10 events delivered per sec
+ final AtomicInteger iter = new AtomicInteger();
+ final int expectedCount = numIterations*numSubscribers;
+
+ final AtomicInteger listenerCount = new AtomicInteger();
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ for (int i = 0; i < numSubscribers; i++) {
+ subscriptionManager.subscribe(MutableMap.<String, Object>of("subscriber", i), entity, TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
+ public void onEvent(SensorEvent<Integer> event) {
+ int count = listenerCount.incrementAndGet();
+ if (count >= expectedCount) completionLatch.countDown();
+ }});
+ }
+
+ measureAndAssert("updateAttributeWithManyListeners", numIterations, minRatePerSec,
+ new Runnable() {
+ @Override public void run() {
+ entity.setAttribute(TestEntity.SEQUENCE, (iter.getAndIncrement()));
+ }},
+ new Runnable() {
+ public void run() {
+ try {
+ completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ assertTrue(completionLatch.getCount() <= 0);
+ }});
+ }
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testUpdateAttributeWithNoListenersButManyUnrelatedListeners() throws Exception {
+ int numUnrelatedSubscribers = 1000;
+ int numIterations = NUM_ITERATIONS;
+ double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
+ final AtomicInteger iter = new AtomicInteger();
+ final AtomicReference<RuntimeException> exception = new AtomicReference<RuntimeException>();
+
+ for (int i = 0; i < (numUnrelatedSubscribers/2); i++) {
+ subscriptionManager.subscribe(MutableMap.<String, Object>of("subscriber", i), entities.get(1), TestEntity.SEQUENCE, new SensorEventListener<Integer>() {
+ public void onEvent(SensorEvent<Integer> event) {
+ exception.set(new RuntimeException("Unrelated subscriber called with "+event));
+ throw exception.get();
+ }});
+ subscriptionManager.subscribe(MutableMap.<String, Object>of("subscriber", i), entity, TestEntity.MY_NOTIF, new SensorEventListener<Integer>() {
+ public void onEvent(SensorEvent<Integer> event) {
+ exception.set(new RuntimeException("Unrelated subscriber called with "+event));
+ throw exception.get();
+ }});
+ }
+
+ measureAndAssert("updateAttributeWithUnrelatedListeners", numIterations, minRatePerSec, new Runnable() {
+ @Override public void run() {
+ entity.setAttribute(TestEntity.SEQUENCE, (iter.incrementAndGet()));
+ }});
+
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.groovy
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.groovy b/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.groovy
deleted file mode 100644
index 8d0d2ae..0000000
--- a/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.groovy
+++ /dev/null
@@ -1,121 +0,0 @@
-package brooklyn.qa.performance
-
-import brooklyn.management.internal.LocalManagementContext
-import org.testng.annotations.AfterMethod
-
-import static brooklyn.test.TestUtils.*
-import static org.testng.Assert.*
-
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.testng.annotations.BeforeMethod
-import org.testng.annotations.Test
-
-import brooklyn.util.task.BasicExecutionManager
-import brooklyn.util.task.SingleThreadedScheduler
-
-public class TaskPerformanceTest extends AbstractPerformanceTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskPerformanceTest.class)
-
- private static final long LONG_TIMEOUT_MS = 30*1000
-
- BasicExecutionManager executionManager
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() {
- super.setUp()
-
- app.start([loc])
-
- executionManager = app.managementContext.executionManager
- }
-
- public static final int numIterations = 200000;
-
- @Test(groups=["Integration", "Acceptance"])
- public void testExecuteSimplestRunnable() {
- double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
-
- final AtomicInteger counter = new AtomicInteger();
- final CountDownLatch completionLatch = new CountDownLatch(1)
-
- Runnable work = new Runnable() { public void run() {
- int val = counter.incrementAndGet()
- if (val >= numIterations) completionLatch.countDown()
- }}
-
- measureAndAssert("executeSimplestRunnable", numIterations, minRatePerSec,
- { executionManager.submit(work) },
- { completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS); assertTrue(completionLatch.getCount() <= 0) })
- }
-
- @Test(groups=["Integration", "Acceptance"])
- public void testExecuteRunnableWithTags() {
- double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
-
- final AtomicInteger counter = new AtomicInteger();
- final CountDownLatch completionLatch = new CountDownLatch(1)
-
- Runnable work = new Runnable() { public void run() {
- int val = counter.incrementAndGet()
- if (val >= numIterations) completionLatch.countDown()
- }}
-
- Map flags = [tags:["a","b"]]
-
- measureAndAssert("testExecuteRunnableWithTags", numIterations, minRatePerSec,
- { executionManager.submit(flags, work) },
- { completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS); assertTrue(completionLatch.getCount() <= 0) })
- }
-
- @Test(groups=["Integration", "Acceptance"])
- public void testExecuteWithSingleThreadedScheduler() {
- double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
-
- executionManager.setTaskSchedulerForTag("singlethreaded", SingleThreadedScheduler.class);
-
- final AtomicInteger concurrentCallCount = new AtomicInteger();
- final AtomicInteger submitCount = new AtomicInteger();
- final AtomicInteger counter = new AtomicInteger();
- final CountDownLatch completionLatch = new CountDownLatch(1)
- final List<Exception> exceptions = new CopyOnWriteArrayList()
-
- Runnable work = new Runnable() { public void run() {
- int numConcurrentCalls = concurrentCallCount.incrementAndGet()
- try {
- if (numConcurrentCalls > 1) throw new IllegalStateException("numConcurrentCalls=$numConcurrentCalls")
- int val = counter.incrementAndGet()
- if (val >= numIterations) completionLatch.countDown()
- } catch (Exception e) {
- exceptions.add(e)
- LOG.warn("Exception in runnable of testExecuteWithSingleThreadedScheduler", e)
- throw e
- } finally {
- concurrentCallCount.decrementAndGet()
- }
- }}
-
- measureAndAssert("testExecuteWithSingleThreadedScheduler", numIterations, minRatePerSec,
- {
- while (submitCount.get() > counter.get() + 5000) {
- LOG.info("delaying because ${submitCount.get()} submitted and only ${counter.get()} run")
- Thread.sleep(500);
- }
- executionManager.submit([tags:["singlethreaded"]], work); submitCount.incrementAndGet(); },
- { completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS); assertTrue(completionLatch.getCount() <= 0) })
-
- if (exceptions.size() > 0) throw exceptions.get(0)
- }
-
- public static void main(String[] args) {
- def t = new TaskPerformanceTest();
- t.setUp();
- t.testExecuteWithSingleThreadedScheduler();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/b1e27d8b/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.java
new file mode 100644
index 0000000..2b549cc
--- /dev/null
+++ b/core/src/test/java/brooklyn/qa/performance/TaskPerformanceTest.java
@@ -0,0 +1,164 @@
+package brooklyn.qa.performance;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.task.SingleThreadedScheduler;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class TaskPerformanceTest extends AbstractPerformanceTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskPerformanceTest.class);
+
+ private static final long LONG_TIMEOUT_MS = 30*1000;
+
+ BasicExecutionManager executionManager;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+
+ app.start(ImmutableList.of(loc));
+
+ executionManager = (BasicExecutionManager) app.getManagementContext().getExecutionManager();
+ }
+
+ public static final int numIterations = 200000;
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testExecuteSimplestRunnable() {
+ double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
+
+ final AtomicInteger counter = new AtomicInteger();
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ final Runnable work = new Runnable() {
+ public void run() {
+ int val = counter.incrementAndGet();
+ if (val >= numIterations) completionLatch.countDown();
+ }};
+
+ measureAndAssert("executeSimplestRunnable", numIterations, minRatePerSec,
+ new Runnable() {
+ public void run() {
+ executionManager.submit(work);
+ }},
+ new Runnable() {
+ public void run() {
+ try {
+ completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ assertTrue(completionLatch.getCount() <= 0);
+ }});
+ }
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testExecuteRunnableWithTags() {
+ double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
+
+ final AtomicInteger counter = new AtomicInteger();
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ final Runnable work = new Runnable() { public void run() {
+ int val = counter.incrementAndGet();
+ if (val >= numIterations) completionLatch.countDown();
+ }
+ };
+
+ final Map<String, ?> flags = MutableMap.of("tags", ImmutableList.of("a","b"));
+
+ measureAndAssert("testExecuteRunnableWithTags", numIterations, minRatePerSec,
+ new Runnable() {
+ public void run() {
+ executionManager.submit(flags, work);
+ }},
+ new Runnable() {
+ public void run() {
+ try {
+ completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ assertTrue(completionLatch.getCount() <= 0);
+ }});
+ }
+
+ @Test(groups={"Integration", "Acceptance"})
+ public void testExecuteWithSingleThreadedScheduler() throws Exception {
+ double minRatePerSec = 1000 * PERFORMANCE_EXPECTATION;
+
+ executionManager.setTaskSchedulerForTag("singlethreaded", SingleThreadedScheduler.class);
+
+ final AtomicInteger concurrentCallCount = new AtomicInteger();
+ final AtomicInteger submitCount = new AtomicInteger();
+ final AtomicInteger counter = new AtomicInteger();
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+ final List<Exception> exceptions = Lists.newCopyOnWriteArrayList();
+
+ final Runnable work = new Runnable() { public void run() {
+ int numConcurrentCalls = concurrentCallCount.incrementAndGet();
+ try {
+ if (numConcurrentCalls > 1) throw new IllegalStateException("numConcurrentCalls="+numConcurrentCalls);
+ int val = counter.incrementAndGet();
+ if (val >= numIterations) completionLatch.countDown();
+ } catch (Exception e) {
+ exceptions.add(e);
+ LOG.warn("Exception in runnable of testExecuteWithSingleThreadedScheduler", e);
+ throw Exceptions.propagate(e);
+ } finally {
+ concurrentCallCount.decrementAndGet();
+ }
+ }
+ };
+
+ measureAndAssert("testExecuteWithSingleThreadedScheduler", numIterations, minRatePerSec,
+ new Runnable() {
+ public void run() {
+ while (submitCount.get() > counter.get() + 5000) {
+ LOG.info("delaying because "+submitCount.get()+" submitted and only "+counter.get()+" run");
+ Time.sleep(500);
+ }
+ executionManager.submit(MutableMap.of("tags", ImmutableList.of("singlethreaded")), work);
+ submitCount.incrementAndGet();
+ }},
+ new Runnable() {
+ public void run() {
+ try {
+ completionLatch.await(LONG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ assertTrue(completionLatch.getCount() <= 0);
+ }});
+
+ if (exceptions.size() > 0) throw exceptions.get(0);
+ }
+
+ public static void main(String[] args) throws Exception {
+ TaskPerformanceTest t = new TaskPerformanceTest();
+ t.setUp();
+ try {
+ t.testExecuteWithSingleThreadedScheduler();
+ } finally {
+ t.tearDown();
+ }
+ }
+}