You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 13:27:03 UTC
[1/3] git commit: Fix ServiceStateLogic updating map
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master e83025110 -> aa20ae71b
Fix ServiceStateLogic updating map
- Don’t modify the existing attribute value. Instead take a copy
and modify that.
- Also fixes (and tidies) assertions in
brooklyn.rest.client.ApplicationResourceIntegrationTest
- BrooklynRestResourceUtils: don’t propagate to the top-level app
the service_up, service_not_up_indicators, etc.
These are populated automatic by app.initEnrichers, which looks
at the child entities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/86aa90bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/86aa90bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/86aa90bd
Branch: refs/heads/master
Commit: 86aa90bd1f8cd1c261cac3f376feb489b2f07015
Parents: 2b4e832
Author: Aled Sage <al...@gmail.com>
Authored: Fri Oct 31 23:07:02 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 10:05:23 2014 +0000
----------------------------------------------------------------------
.../entity/basic/ServiceStateLogic.java | 72 +++++++++++++-------
.../ApplicationResourceIntegrationTest.java | 29 ++++----
.../rest/util/BrooklynRestResourceUtils.java | 5 +-
3 files changed, 64 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java b/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
index b54e732..2d3577b 100644
--- a/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
+++ b/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
@@ -90,31 +90,51 @@ public class ServiceStateLogic {
}
/** update the given key in the given map sensor */
- public static <TKey,TVal> void updateMapSensorEntry(EntityLocal entity, AttributeSensor<Map<TKey,TVal>> sensor, TKey key, TVal v) {
- Map<TKey, TVal> map = entity.getAttribute(sensor);
-
- boolean created = (map==null);
- if (created) map = MutableMap.of();
+ public static <TKey,TVal> void updateMapSensorEntry(EntityLocal entity, AttributeSensor<Map<TKey,TVal>> sensor, final TKey key, final TVal v) {
+ /*
+ * Important to *not* modify the existing attribute value; must make a copy, modify that, and publish.
+ * This is because a Propagator enricher will set this same value on another entity. There was very
+ * strange behaviour when this was done for a SERVICE_UP_INDICATORS sensor - the updates done here
+ * applied to the attribute of both entities!
+ *
+ * Need to do this update atomically (i.e. sequentially) because there is no threading control for
+ * what is calling updateMapSensorEntity. It is called directly on start, on initialising enrichers,
+ * and in event listeners. These calls could be concurrent.
+ */
+ Function<Map<TKey,TVal>, Maybe<Map<TKey,TVal>>> modifier = new Function<Map<TKey,TVal>, Maybe<Map<TKey,TVal>>>() {
+ @Override public Maybe<Map<TKey, TVal>> apply(Map<TKey, TVal> map) {
+ boolean created = (map==null);
+ if (created) map = MutableMap.of();
- boolean changed;
- if (v == Entities.REMOVE) {
- changed = map.containsKey(key);
- if (changed)
- map.remove(key);
- } else {
- TVal oldV = map.get(key);
- if (oldV==null)
- changed = (v!=null || !map.containsKey(key));
- else
- changed = !oldV.equals(v);
- if (changed)
- map.put(key, (TVal)v);
- }
- if (changed || created) {
- if (!Entities.isNoLongerManaged(entity)) {
- // TODO synchronize; then emit a copy to prevent CME's e.g. UrlMappingTest
- entity.setAttribute(sensor, map);
+ boolean changed;
+ if (v == Entities.REMOVE) {
+ changed = map.containsKey(key);
+ if (changed) {
+ map = MutableMap.copyOf(map);
+ map.remove(key);
+ }
+ } else {
+ TVal oldV = map.get(key);
+ if (oldV==null) {
+ changed = (v!=null || !map.containsKey(key));
+ } else {
+ changed = !oldV.equals(v);
+ }
+ if (changed) {
+ map = MutableMap.copyOf(map);
+ map.put(key, (TVal)v);
+ }
+ }
+ if (changed || created) {
+ return Maybe.of(map);
+ } else {
+ return Maybe.absent();
+ }
}
+ };
+
+ if (!Entities.isNoLongerManaged(entity)) {
+ entity.modifyAttribute(sensor, modifier);
}
}
@@ -424,11 +444,13 @@ public class ServiceStateLogic {
}
// override superclass to publish multiple sensors
- if (getConfig(DERIVE_SERVICE_PROBLEMS))
+ if (getConfig(DERIVE_SERVICE_PROBLEMS)) {
updateMapSensor(SERVICE_PROBLEMS, computeServiceProblems());
+ }
- if (getConfig(DERIVE_SERVICE_NOT_UP))
+ if (getConfig(DERIVE_SERVICE_NOT_UP)) {
updateMapSensor(SERVICE_NOT_UP_INDICATORS, computeServiceNotUp());
+ }
}
protected Object computeServiceNotUp() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java b/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
index 15bde52..e303571 100644
--- a/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
+++ b/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
@@ -62,6 +62,8 @@ public class ApplicationResourceIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ApplicationResourceIntegrationTest.class);
+ private static final Duration LONG_WAIT = Duration.minutes(10);
+
private final String redisSpec = "{\"name\": \"redis-app\", \"type\": \"brooklyn.entity.nosql.redis.RedisStore\", \"locations\": [ \"localhost\"]}";
private final ApplicationSpec legacyRedisSpec = ApplicationSpec.builder().name("redis-legacy-app")
@@ -117,11 +119,7 @@ public class ApplicationResourceIntegrationTest {
assertEquals(response.getStatus(), 201);
assertEquals(getManagementContext().getApplications().size(), 1);
final String entityId = getManagementContext().getApplications().iterator().next().getChildren().iterator().next().getId();
- Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
- public void run() {
- Object status = api.getSensorApi().get("redis-app", entityId, "service.state", false);
- assertEquals(status, Lifecycle.RUNNING.toString());
- }});
+ assertServiceStateEventually("redis-app", entityId, Lifecycle.RUNNING, LONG_WAIT);
}
@Test(groups = "Integration", dependsOnMethods = "testDeployRedisApplication")
@@ -130,11 +128,8 @@ public class ApplicationResourceIntegrationTest {
Response response = api.getApplicationApi().create(legacyRedisSpec);
assertEquals(response.getStatus(), 201);
assertEquals(getManagementContext().getApplications().size(), 2);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
- public void run() {
- Object status = api.getSensorApi().get("redis-legacy-app", "redis-ent", "service.state", false);
- assertEquals(status, Lifecycle.RUNNING.toString());
- }});
+ assertServiceStateEventually("redis-legacy-app", "redis-ent", Lifecycle.RUNNING, LONG_WAIT);
+
// Tear the app down so it doesn't interfere with other tests
Response deleteResponse = api.getApplicationApi().delete("redis-legacy-app");
assertEquals(deleteResponse.getStatus(), 202);
@@ -167,12 +162,7 @@ public class ApplicationResourceIntegrationTest {
Response response = api.getEffectorApi().invoke("redis-app", entityId, "stop", "5000", ImmutableMap.<String, Object>of());
assertEquals(response.getStatus(), Response.Status.ACCEPTED.getStatusCode());
-
- Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
- public void run() {
- Object status = api.getSensorApi().get("redis-app", entityId, "service.state", false);
- assertEquals(status, Lifecycle.STOPPED.toString());
- }});
+ assertServiceStateEventually("redis-app", entityId, Lifecycle.STOPPED, LONG_WAIT);
}
@Test(groups = "Integration", dependsOnMethods = "testTriggerRedisStopEffector")
@@ -201,4 +191,11 @@ public class ApplicationResourceIntegrationTest {
assertEquals(getManagementContext().getApplications().size(), size - 1);
}
+ private void assertServiceStateEventually(final String app, final String entity, final Lifecycle state, Duration timeout) {
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", timeout), new Runnable() {
+ public void run() {
+ Object status = api.getSensorApi().get(app, entity, "service.state", false);
+ assertTrue(state.toString().equalsIgnoreCase(status.toString()), "status="+status);
+ }});
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
----------------------------------------------------------------------
diff --git a/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java b/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
index 7c86d6a..52f9df8 100644
--- a/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
+++ b/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
@@ -47,6 +47,7 @@ import brooklyn.entity.Application;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
import brooklyn.entity.basic.BasicApplication;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.EntityInternal;
@@ -273,7 +274,9 @@ public class BrooklynRestResourceUtils {
Entity soleChild = mgmt.getEntityManager().createEntity(toCoreEntitySpec(eclazz, name, configO));
instance.addChild(soleChild);
instance.addEnricher(Enrichers.builder()
- .propagatingAll()
+ .propagatingAllBut(Attributes.SERVICE_UP, Attributes.SERVICE_NOT_UP_INDICATORS,
+ Attributes.SERVICE_STATE_ACTUAL, Attributes.SERVICE_STATE_EXPECTED,
+ Attributes.SERVICE_PROBLEMS)
.from(soleChild)
.build());
[3/3] git commit: This closes #290
Posted by al...@apache.org.
This closes #290
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/aa20ae71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/aa20ae71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/aa20ae71
Branch: refs/heads/master
Commit: aa20ae71bb2307a5a3926eac12bac6fbe4bfcf7a
Parents: e830251 86aa90b
Author: Aled Sage <al...@gmail.com>
Authored: Mon Nov 3 12:26:49 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 12:26:49 2014 +0000
----------------------------------------------------------------------
.../java/brooklyn/entity/basic/EntityLocal.java | 22 +++-
.../brooklyn/entity/basic/AbstractEntity.java | 25 ++++
.../entity/basic/ServiceStateLogic.java | 72 +++++++----
.../java/brooklyn/event/basic/AttributeMap.java | 22 +++-
.../brooklyn/entity/basic/AttributeMapTest.java | 127 ++++++++++++++-----
.../ApplicationResourceIntegrationTest.java | 29 ++---
.../rest/util/BrooklynRestResourceUtils.java | 5 +-
7 files changed, 223 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: Add EntityLocal.modifyAttribute(AttributeSensor,
Function)
Posted by al...@apache.org.
Add EntityLocal.modifyAttribute(AttributeSensor, Function)
- For atomic (sequential) updates to an attribute, where the new
value is computed from the old value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2b4e8323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2b4e8323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2b4e8323
Branch: refs/heads/master
Commit: 2b4e832357de3341f60a4eaaa4d05c7c97db4271
Parents: e830251
Author: Aled Sage <al...@gmail.com>
Authored: Fri Oct 31 22:57:14 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 10:05:23 2014 +0000
----------------------------------------------------------------------
.../java/brooklyn/entity/basic/EntityLocal.java | 22 +++-
.../brooklyn/entity/basic/AbstractEntity.java | 25 ++++
.../java/brooklyn/event/basic/AttributeMap.java | 22 +++-
.../brooklyn/entity/basic/AttributeMapTest.java | 127 ++++++++++++++-----
4 files changed, 159 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
index 3405443..772161c 100644
--- a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
+++ b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
@@ -33,8 +33,10 @@ import brooklyn.management.SubscriptionContext;
import brooklyn.management.SubscriptionHandle;
import brooklyn.management.SubscriptionManager;
import brooklyn.management.Task;
+import brooklyn.util.guava.Maybe;
import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
/**
* Extended Entity interface for use in places where the caller should have certain privileges,
@@ -66,14 +68,30 @@ public interface EntityLocal extends Entity, Configurable {
<T> T setConfig(HasConfigKey<T> key, Task<T> val);
/**
- * Sets the {@link Sensor} data for the given attribute to the specified value.
+ * Sets the {@link AttributeSensor} data for the given attribute to the specified value.
*
* This can be used to "enrich" the entity, such as adding aggregated information,
* rolling averages, etc.
*
* @return the old value for the attribute (possibly {@code null})
*/
- <T> T setAttribute(AttributeSensor<T> sensor, T val);
+ <T> T setAttribute(AttributeSensor<T> attribute, T val);
+
+ /**
+ * Atomically modifies the {@link AttributeSensor}, ensuring that only one modification is done
+ * at a time.
+ *
+ * If the modifier returns {@link Maybe#absent()} then the attribute will be
+ * left unmodified, and the existing value will be returned.
+ *
+ * For details of the synchronization model used to achieve this, refer to the underlying
+ * attribute store (e.g. AttributeMap).
+ *
+ * @return the old value for the attribute (possibly {@code null})
+ * @since 0.7.0-M2
+ */
+ @Beta
+ <T> T modifyAttribute(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier);
/**
* @deprecated in 0.5; use {@link #getConfig(ConfigKey)}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
index 00b58b7..e66bdef 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
@@ -92,6 +92,7 @@ import brooklyn.util.task.DeferredSupplier;
import brooklyn.util.text.Strings;
import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.collect.ImmutableList;
@@ -829,6 +830,30 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
return result;
}
+ @Beta
+ @Override
+ public <T> T modifyAttribute(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) {
+ if (LOG.isTraceEnabled())
+ LOG.trace(""+this+" modifyAttribute "+attribute+" "+modifier);
+
+ if (Boolean.TRUE.equals(getManagementSupport().isReadOnlyRaw())) {
+ if (WARNED_READ_ONLY_ATTRIBUTES.add(attribute.getName())) {
+ LOG.warn(""+this+" modifying "+attribute+" = "+modifier+" in read only mode; will have no effect (future messages for this sensor logged at trace)");
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace(""+this+" setting "+attribute+" = "+modifier+" in read only mode; will have no effect");
+ }
+ }
+ T result = attributesInternal.modify(attribute, modifier);
+ if (result == null) {
+ // could be this is a new sensor
+ entityType.addSensorIfAbsent(attribute);
+ }
+
+ // TODO Conditionally set onAttributeChanged, only if was modified
+ getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute);
+ return result;
+ }
+
@Override
public void removeAttribute(AttributeSensor<?> attribute) {
if (LOG.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/event/basic/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/AttributeMap.java b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
index ca7322d..08f1f91 100644
--- a/core/src/main/java/brooklyn/event/basic/AttributeMap.java
+++ b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
@@ -31,7 +31,9 @@ import brooklyn.entity.Entity;
import brooklyn.entity.basic.AbstractEntity;
import brooklyn.event.AttributeSensor;
import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.guava.Maybe;
+import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
@@ -53,7 +55,7 @@ public final class AttributeMap implements Serializable {
private final AbstractEntity entity;
- // Note that we synchronize on the top-level map, to handle concurrent updates and and gets (ENGR-2111)
+ // Assumed to be something like a ConcurrentMap passed in.
private final Map<Collection<String>, Object> values;
/**
@@ -132,6 +134,24 @@ public final class AttributeMap implements Serializable {
return (isNull(oldValue)) ? null : oldValue;
}
+ /**
+ * Where atomicity is desired, the methods in this class synchronize on the {@link #values} map.
+ */
+ public <T> T modify(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) {
+ synchronized (values) {
+ T oldValue = getValue(attribute);
+ Maybe<? extends T> newValue = modifier.apply(oldValue);
+
+ if (newValue.isPresent()) {
+ if (log.isTraceEnabled()) log.trace("modified attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity});
+ return update(attribute, newValue.get());
+ } else {
+ if (log.isTraceEnabled()) log.trace("modified attribute {} unchanged; not emitting on {}", new Object[] {attribute.getName(), newValue, this});
+ return oldValue;
+ }
+ }
+ }
+
public void remove(AttributeSensor<?> attribute) {
if (log.isDebugEnabled()) {
log.debug("removing attribute {} on {}", attribute.getName(), entity);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
index e3d992d..3224041 100644
--- a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
+++ b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
@@ -19,6 +19,7 @@
package brooklyn.entity.basic;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.Collection;
import java.util.Collections;
@@ -34,75 +35,73 @@ import org.testng.annotations.Test;
import brooklyn.entity.Application;
import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
import brooklyn.event.basic.AttributeMap;
import brooklyn.event.basic.Sensors;
+import brooklyn.test.Asserts;
import brooklyn.test.entity.TestApplication;
import brooklyn.test.entity.TestEntityImpl;
import brooklyn.util.collections.MutableMap;
+import brooklyn.util.guava.Maybe;
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class AttributeMapTest {
Application app;
+ TestEntityImpl entity;
AttributeMap map;
-
+ ExecutorService executor;
+
@BeforeMethod(alwaysRun=true)
public void setUp() {
app = TestApplication.Factory.newManagedInstanceForTests();
- TestEntityImpl e = new TestEntityImpl(app);
- map = new AttributeMap(e, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
- Entities.startManagement(app);
+ entity = new TestEntityImpl(app);
+ map = new AttributeMap(entity, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
+ Entities.manage(entity);
+ executor = Executors.newCachedThreadPool();
}
@AfterMethod(alwaysRun=true)
public void tearDown() {
+ if (executor != null) executor.shutdownNow();
if (app != null) Entities.destroyAll(app.getManagementContext());
}
// See ENGR-2111
@Test
public void testConcurrentUpdatesDoNotCauseConcurrentModificationException() throws Exception {
- ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> futures = Lists.newArrayList();
- try {
- for (int i = 0; i < 1000; i++) {
- final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
- Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
- futures.add(future);
- }
-
- for (Future<?> future : futures) {
- future.get();
- }
-
- } finally {
- executor.shutdownNow();
+ for (int i = 0; i < 1000; i++) {
+ final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
}
}
@Test
public void testConcurrentUpdatesAndGetsDoNotCauseConcurrentModificationException() throws Exception {
- ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> futures = Lists.newArrayList();
- try {
- for (int i = 0; i < 1000; i++) {
- final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
- Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
- Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
- futures.add(future);
- futures.add(future2);
- }
+ for (int i = 0; i < 1000; i++) {
+ final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+ Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
+ futures.add(future);
+ futures.add(future2);
+ }
- for (Future<?> future : futures) {
- future.get();
- }
-
- } finally {
- executor.shutdownNow();
+ for (Future<?> future : futures) {
+ future.get();
}
}
@@ -147,7 +146,7 @@ public class AttributeMapTest {
assertEquals(map.getValue(childSensor), "childValue");
assertEquals(map.getValue(sensor), "parentValue");
}
-
+
@Test
public void testCanStoreChildThenParentSensor() throws Exception {
AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
@@ -160,6 +159,46 @@ public class AttributeMapTest {
assertEquals(map.getValue(sensor), "parentValue");
}
+ @Test
+ public void testConcurrentModifyAttributeCalls() throws Exception {
+ AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+
+ Function<Integer, Maybe<Integer>> modifier = new Function<Integer, Maybe<Integer>>() {
+ @Override public Maybe<Integer> apply(Integer input) {
+ return Maybe.of((input == null) ? 1 : input + 1);
+ }
+ };
+
+ List<Future<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < 1000; i++) {
+ Future<?> future = executor.submit(newModifyAttributeCallable(map, sensor, modifier));
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.getValue(sensor), Integer.valueOf(1000));
+ }
+
+ @Test
+ public void testModifyAttributeReturningAbsentDoesNotEmit() throws Exception {
+ AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+ AttributeSensor<Integer> childSensor = Sensors.newIntegerSensor("a.b", "");
+
+ final RecordingSensorEventListener listener = new RecordingSensorEventListener();
+ entity.subscribe(entity, sensor, listener);
+
+ map.modify(childSensor, Functions.constant(Maybe.<Integer>absent()));
+
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(listener.getEvents().isEmpty(), "events="+listener.getEvents());
+ }});
+ }
+
protected <T> Runnable newUpdateMapRunnable(final AttributeMap map, final AttributeSensor<T> attribute, final T val) {
return new Runnable() {
@Override public void run() {
@@ -175,4 +214,24 @@ public class AttributeMapTest {
}
};
}
+
+ protected <T> Callable<T> newModifyAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute, final Function<? super T, Maybe<T>> modifier) {
+ return new Callable<T>() {
+ @Override public T call() {
+ return map.modify(attribute, modifier);
+ }
+ };
+ }
+
+ public static class RecordingSensorEventListener implements SensorEventListener<Object> {
+ private List<SensorEvent<Object>> events = Collections.synchronizedList(Lists.<SensorEvent<Object>>newArrayList());
+
+ @Override public void onEvent(SensorEvent<Object> event) {
+ events.add(event);
+ }
+
+ public List<SensorEvent<Object>> getEvents() {
+ return ImmutableList.copyOf(events);
+ }
+ }
}