You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 13:27:03 UTC

[1/3] git commit: Fix ServiceStateLogic updating map

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master e83025110 -> aa20ae71b


Fix ServiceStateLogic updating map

- Don’t modify the existing attribute value. Instead take a copy
  and modify that.
- Also fixes (and tidies) assertions in
  brooklyn.rest.client.ApplicationResourceIntegrationTest
- BrooklynRestResourceUtils: don’t propagate to the top-level app
  the service_up, service_not_up_indicators, etc.
  These are populated automatic by app.initEnrichers, which looks
  at the child entities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/86aa90bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/86aa90bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/86aa90bd

Branch: refs/heads/master
Commit: 86aa90bd1f8cd1c261cac3f376feb489b2f07015
Parents: 2b4e832
Author: Aled Sage <al...@gmail.com>
Authored: Fri Oct 31 23:07:02 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 10:05:23 2014 +0000

----------------------------------------------------------------------
 .../entity/basic/ServiceStateLogic.java         | 72 +++++++++++++-------
 .../ApplicationResourceIntegrationTest.java     | 29 ++++----
 .../rest/util/BrooklynRestResourceUtils.java    |  5 +-
 3 files changed, 64 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java b/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
index b54e732..2d3577b 100644
--- a/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
+++ b/core/src/main/java/brooklyn/entity/basic/ServiceStateLogic.java
@@ -90,31 +90,51 @@ public class ServiceStateLogic {
     }
 
     /** update the given key in the given map sensor */
-    public static <TKey,TVal> void updateMapSensorEntry(EntityLocal entity, AttributeSensor<Map<TKey,TVal>> sensor, TKey key, TVal v) {
-        Map<TKey, TVal> map = entity.getAttribute(sensor);
-
-        boolean created = (map==null);
-        if (created) map = MutableMap.of();
+    public static <TKey,TVal> void updateMapSensorEntry(EntityLocal entity, AttributeSensor<Map<TKey,TVal>> sensor, final TKey key, final TVal v) {
+        /*
+         * Important to *not* modify the existing attribute value; must make a copy, modify that, and publish.
+         * This is because a Propagator enricher will set this same value on another entity. There was very
+         * strange behaviour when this was done for a SERVICE_UP_INDICATORS sensor - the updates done here 
+         * applied to the attribute of both entities!
+         * 
+         * Need to do this update atomically (i.e. sequentially) because there is no threading control for
+         * what is calling updateMapSensorEntity. It is called directly on start, on initialising enrichers,
+         * and in event listeners. These calls could be concurrent.
+         */
+        Function<Map<TKey,TVal>, Maybe<Map<TKey,TVal>>> modifier = new Function<Map<TKey,TVal>, Maybe<Map<TKey,TVal>>>() {
+            @Override public Maybe<Map<TKey, TVal>> apply(Map<TKey, TVal> map) {
+                boolean created = (map==null);
+                if (created) map = MutableMap.of();
                 
-        boolean changed;
-        if (v == Entities.REMOVE) {
-            changed = map.containsKey(key);
-            if (changed)
-                map.remove(key);
-        } else {
-            TVal oldV = map.get(key);
-            if (oldV==null)
-                changed = (v!=null || !map.containsKey(key));
-            else
-                changed = !oldV.equals(v);
-            if (changed)
-                map.put(key, (TVal)v);
-        }
-        if (changed || created) {
-            if (!Entities.isNoLongerManaged(entity)) { 
-                // TODO synchronize; then emit a copy to prevent CME's e.g. UrlMappingTest
-                entity.setAttribute(sensor, map);
+                boolean changed;
+                if (v == Entities.REMOVE) {
+                    changed = map.containsKey(key);
+                    if (changed) {
+                        map = MutableMap.copyOf(map);
+                        map.remove(key);
+                    }
+                } else {
+                    TVal oldV = map.get(key);
+                    if (oldV==null) {
+                        changed = (v!=null || !map.containsKey(key));
+                    } else {
+                        changed = !oldV.equals(v);
+                    }
+                    if (changed) {
+                        map = MutableMap.copyOf(map);
+                        map.put(key, (TVal)v);
+                    }
+                }
+                if (changed || created) {
+                    return Maybe.of(map);
+                } else {
+                    return Maybe.absent();
+                }
             }
+        };
+        
+        if (!Entities.isNoLongerManaged(entity)) { 
+            entity.modifyAttribute(sensor, modifier);
         }
     }
     
@@ -424,11 +444,13 @@ public class ServiceStateLogic {
             }
 
             // override superclass to publish multiple sensors
-            if (getConfig(DERIVE_SERVICE_PROBLEMS))
+            if (getConfig(DERIVE_SERVICE_PROBLEMS)) {
                 updateMapSensor(SERVICE_PROBLEMS, computeServiceProblems());
+            }
 
-            if (getConfig(DERIVE_SERVICE_NOT_UP))
+            if (getConfig(DERIVE_SERVICE_NOT_UP)) {
                 updateMapSensor(SERVICE_NOT_UP_INDICATORS, computeServiceNotUp());
+            }
         }
 
         protected Object computeServiceNotUp() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java b/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
index 15bde52..e303571 100644
--- a/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
+++ b/usage/rest-client/src/test/java/brooklyn/rest/client/ApplicationResourceIntegrationTest.java
@@ -62,6 +62,8 @@ public class ApplicationResourceIntegrationTest {
 
     private static final Logger log = LoggerFactory.getLogger(ApplicationResourceIntegrationTest.class);
 
+    private static final Duration LONG_WAIT = Duration.minutes(10);
+    
     private final String redisSpec = "{\"name\": \"redis-app\", \"type\": \"brooklyn.entity.nosql.redis.RedisStore\", \"locations\": [ \"localhost\"]}";
     
     private final ApplicationSpec legacyRedisSpec = ApplicationSpec.builder().name("redis-legacy-app")
@@ -117,11 +119,7 @@ public class ApplicationResourceIntegrationTest {
         assertEquals(response.getStatus(), 201);
         assertEquals(getManagementContext().getApplications().size(), 1);
         final String entityId = getManagementContext().getApplications().iterator().next().getChildren().iterator().next().getId();
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
-            public void run() {
-                Object status = api.getSensorApi().get("redis-app", entityId, "service.state", false);
-                assertEquals(status, Lifecycle.RUNNING.toString());
-            }});
+        assertServiceStateEventually("redis-app", entityId, Lifecycle.RUNNING, LONG_WAIT);
     }
     
     @Test(groups = "Integration", dependsOnMethods = "testDeployRedisApplication")
@@ -130,11 +128,8 @@ public class ApplicationResourceIntegrationTest {
         Response response = api.getApplicationApi().create(legacyRedisSpec);
         assertEquals(response.getStatus(), 201);
         assertEquals(getManagementContext().getApplications().size(), 2);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
-            public void run() {
-                Object status = api.getSensorApi().get("redis-legacy-app", "redis-ent", "service.state", false);
-                assertEquals(status, Lifecycle.RUNNING.toString());
-            }});
+        assertServiceStateEventually("redis-legacy-app", "redis-ent", Lifecycle.RUNNING, LONG_WAIT);
+        
         // Tear the app down so it doesn't interfere with other tests 
         Response deleteResponse = api.getApplicationApi().delete("redis-legacy-app");
         assertEquals(deleteResponse.getStatus(), 202);
@@ -167,12 +162,7 @@ public class ApplicationResourceIntegrationTest {
         Response response = api.getEffectorApi().invoke("redis-app", entityId, "stop", "5000", ImmutableMap.<String, Object>of());
 
         assertEquals(response.getStatus(), Response.Status.ACCEPTED.getStatusCode());
-
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.minutes(10)), new Runnable() {
-            public void run() {
-                Object status = api.getSensorApi().get("redis-app", entityId, "service.state", false);
-                assertEquals(status, Lifecycle.STOPPED.toString());
-            }});
+        assertServiceStateEventually("redis-app", entityId, Lifecycle.STOPPED, LONG_WAIT);
     }
 
     @Test(groups = "Integration", dependsOnMethods = "testTriggerRedisStopEffector")
@@ -201,4 +191,11 @@ public class ApplicationResourceIntegrationTest {
         assertEquals(getManagementContext().getApplications().size(), size - 1);
     }
 
+    private void assertServiceStateEventually(final String app, final String entity, final Lifecycle state, Duration timeout) {
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", timeout), new Runnable() {
+            public void run() {
+                Object status = api.getSensorApi().get(app, entity, "service.state", false);
+                assertTrue(state.toString().equalsIgnoreCase(status.toString()), "status="+status);
+            }});
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/86aa90bd/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
----------------------------------------------------------------------
diff --git a/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java b/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
index 7c86d6a..52f9df8 100644
--- a/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
+++ b/usage/rest-server/src/main/java/brooklyn/rest/util/BrooklynRestResourceUtils.java
@@ -47,6 +47,7 @@ import brooklyn.entity.Application;
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
 import brooklyn.entity.basic.BasicApplication;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityInternal;
@@ -273,7 +274,9 @@ public class BrooklynRestResourceUtils {
                     Entity soleChild = mgmt.getEntityManager().createEntity(toCoreEntitySpec(eclazz, name, configO));
                     instance.addChild(soleChild);
                     instance.addEnricher(Enrichers.builder()
-                            .propagatingAll()
+                            .propagatingAllBut(Attributes.SERVICE_UP, Attributes.SERVICE_NOT_UP_INDICATORS, 
+                                    Attributes.SERVICE_STATE_ACTUAL, Attributes.SERVICE_STATE_EXPECTED, 
+                                    Attributes.SERVICE_PROBLEMS)
                             .from(soleChild)
                             .build());
 


[3/3] git commit: This closes #290

Posted by al...@apache.org.
This closes #290


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/aa20ae71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/aa20ae71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/aa20ae71

Branch: refs/heads/master
Commit: aa20ae71bb2307a5a3926eac12bac6fbe4bfcf7a
Parents: e830251 86aa90b
Author: Aled Sage <al...@gmail.com>
Authored: Mon Nov 3 12:26:49 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 12:26:49 2014 +0000

----------------------------------------------------------------------
 .../java/brooklyn/entity/basic/EntityLocal.java |  22 +++-
 .../brooklyn/entity/basic/AbstractEntity.java   |  25 ++++
 .../entity/basic/ServiceStateLogic.java         |  72 +++++++----
 .../java/brooklyn/event/basic/AttributeMap.java |  22 +++-
 .../brooklyn/entity/basic/AttributeMapTest.java | 127 ++++++++++++++-----
 .../ApplicationResourceIntegrationTest.java     |  29 ++---
 .../rest/util/BrooklynRestResourceUtils.java    |   5 +-
 7 files changed, 223 insertions(+), 79 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Add EntityLocal.modifyAttribute(AttributeSensor, Function)

Posted by al...@apache.org.
Add EntityLocal.modifyAttribute(AttributeSensor, Function)

- For atomic (sequential) updates to an attribute, where the new
  value is computed from the old value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2b4e8323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2b4e8323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2b4e8323

Branch: refs/heads/master
Commit: 2b4e832357de3341f60a4eaaa4d05c7c97db4271
Parents: e830251
Author: Aled Sage <al...@gmail.com>
Authored: Fri Oct 31 22:57:14 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Nov 3 10:05:23 2014 +0000

----------------------------------------------------------------------
 .../java/brooklyn/entity/basic/EntityLocal.java |  22 +++-
 .../brooklyn/entity/basic/AbstractEntity.java   |  25 ++++
 .../java/brooklyn/event/basic/AttributeMap.java |  22 +++-
 .../brooklyn/entity/basic/AttributeMapTest.java | 127 ++++++++++++++-----
 4 files changed, 159 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
index 3405443..772161c 100644
--- a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
+++ b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
@@ -33,8 +33,10 @@ import brooklyn.management.SubscriptionContext;
 import brooklyn.management.SubscriptionHandle;
 import brooklyn.management.SubscriptionManager;
 import brooklyn.management.Task;
+import brooklyn.util.guava.Maybe;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
 
 /** 
  * Extended Entity interface for use in places where the caller should have certain privileges,
@@ -66,14 +68,30 @@ public interface EntityLocal extends Entity, Configurable {
     <T> T setConfig(HasConfigKey<T> key, Task<T> val);
 
     /**
-     * Sets the {@link Sensor} data for the given attribute to the specified value.
+     * Sets the {@link AttributeSensor} data for the given attribute to the specified value.
      * 
      * This can be used to "enrich" the entity, such as adding aggregated information, 
      * rolling averages, etc.
      * 
      * @return the old value for the attribute (possibly {@code null})
      */
-    <T> T setAttribute(AttributeSensor<T> sensor, T val);
+    <T> T setAttribute(AttributeSensor<T> attribute, T val);
+
+    /**
+     * Atomically modifies the {@link AttributeSensor}, ensuring that only one modification is done
+     * at a time.
+     * 
+     * If the modifier returns {@link Maybe#absent()} then the attribute will be
+     * left unmodified, and the existing value will be returned.
+     * 
+     * For details of the synchronization model used to achieve this, refer to the underlying 
+     * attribute store (e.g. AttributeMap).
+     * 
+     * @return the old value for the attribute (possibly {@code null})
+     * @since 0.7.0-M2
+     */
+    @Beta
+    <T> T modifyAttribute(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier);
 
     /**
      * @deprecated in 0.5; use {@link #getConfig(ConfigKey)}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
index 00b58b7..e66bdef 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
@@ -92,6 +92,7 @@ import brooklyn.util.task.DeferredSupplier;
 import brooklyn.util.text.Strings;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.collect.ImmutableList;
@@ -829,6 +830,30 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
         return result;
     }
 
+    @Beta
+    @Override
+    public <T> T modifyAttribute(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) {
+        if (LOG.isTraceEnabled())
+            LOG.trace(""+this+" modifyAttribute "+attribute+" "+modifier);
+        
+        if (Boolean.TRUE.equals(getManagementSupport().isReadOnlyRaw())) {
+            if (WARNED_READ_ONLY_ATTRIBUTES.add(attribute.getName())) {
+                LOG.warn(""+this+" modifying "+attribute+" = "+modifier+" in read only mode; will have no effect (future messages for this sensor logged at trace)");
+            } else if (LOG.isTraceEnabled()) {
+                LOG.trace(""+this+" setting "+attribute+" = "+modifier+" in read only mode; will have no effect");
+            }
+        }
+        T result = attributesInternal.modify(attribute, modifier);
+        if (result == null) {
+            // could be this is a new sensor
+            entityType.addSensorIfAbsent(attribute);
+        }
+        
+        // TODO Conditionally set onAttributeChanged, only if was modified
+        getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute);
+        return result;
+    }
+
     @Override
     public void removeAttribute(AttributeSensor<?> attribute) {
         if (LOG.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/event/basic/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/AttributeMap.java b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
index ca7322d..08f1f91 100644
--- a/core/src/main/java/brooklyn/event/basic/AttributeMap.java
+++ b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
@@ -31,7 +31,9 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.event.AttributeSensor;
 import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.guava.Maybe;
 
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -53,7 +55,7 @@ public final class AttributeMap implements Serializable {
     
     private final AbstractEntity entity;
 
-    // Note that we synchronize on the top-level map, to handle concurrent updates and and gets (ENGR-2111)
+    // Assumed to be something like a ConcurrentMap passed in.
     private final Map<Collection<String>, Object> values;
 
     /**
@@ -132,6 +134,24 @@ public final class AttributeMap implements Serializable {
         return (isNull(oldValue)) ? null : oldValue;
     }
 
+    /**
+     * Where atomicity is desired, the methods in this class synchronize on the {@link #values} map.
+     */
+    public <T> T modify(AttributeSensor<T> attribute, Function<? super T, Maybe<T>> modifier) {
+        synchronized (values) {
+            T oldValue = getValue(attribute);
+            Maybe<? extends T> newValue = modifier.apply(oldValue);
+
+            if (newValue.isPresent()) {
+                if (log.isTraceEnabled()) log.trace("modified attribute {} to {} (was {}) on {}", new Object[] {attribute.getName(), newValue, oldValue, entity});
+                return update(attribute, newValue.get());
+            } else {
+                if (log.isTraceEnabled()) log.trace("modified attribute {} unchanged; not emitting on {}", new Object[] {attribute.getName(), newValue, this});
+                return oldValue;
+            }
+        }
+    }
+
     public void remove(AttributeSensor<?> attribute) {
         if (log.isDebugEnabled()) {
             log.debug("removing attribute {} on {}", attribute.getName(), entity);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
index e3d992d..3224041 100644
--- a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
+++ b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
@@ -19,6 +19,7 @@
 package brooklyn.entity.basic;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -34,75 +35,73 @@ import org.testng.annotations.Test;
 
 import brooklyn.entity.Application;
 import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
 import brooklyn.event.basic.AttributeMap;
 import brooklyn.event.basic.Sensors;
+import brooklyn.test.Asserts;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.test.entity.TestEntityImpl;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.guava.Maybe;
 
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class AttributeMapTest {
 
     Application app;
+    TestEntityImpl entity;
     AttributeMap map;
-
+    ExecutorService executor;
+    
     @BeforeMethod(alwaysRun=true)
     public void setUp() {
         app = TestApplication.Factory.newManagedInstanceForTests();
-        TestEntityImpl e = new TestEntityImpl(app);
-        map = new AttributeMap(e, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
-        Entities.startManagement(app);
+        entity = new TestEntityImpl(app);
+        map = new AttributeMap(entity, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
+        Entities.manage(entity);
+        executor = Executors.newCachedThreadPool();
     }
     
     @AfterMethod(alwaysRun=true)
     public void tearDown() {
+        if (executor != null) executor.shutdownNow();
         if (app != null) Entities.destroyAll(app.getManagementContext());
     }
     
     // See ENGR-2111
     @Test
     public void testConcurrentUpdatesDoNotCauseConcurrentModificationException() throws Exception {
-        ExecutorService executor = Executors.newCachedThreadPool();
         List<Future<?>> futures = Lists.newArrayList();
         
-        try {
-            for (int i = 0; i < 1000; i++) {
-                final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
-                Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
-                futures.add(future);
-            }
-            
-            for (Future<?> future : futures) {
-                future.get();
-            }
-            
-        } finally {
-            executor.shutdownNow();
+        for (int i = 0; i < 1000; i++) {
+            final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+            Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+            futures.add(future);
+        }
+        
+        for (Future<?> future : futures) {
+            future.get();
         }
     }
     
     @Test
     public void testConcurrentUpdatesAndGetsDoNotCauseConcurrentModificationException() throws Exception {
-        ExecutorService executor = Executors.newCachedThreadPool();
         List<Future<?>> futures = Lists.newArrayList();
         
-        try {
-            for (int i = 0; i < 1000; i++) {
-                final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
-                Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
-                Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
-                futures.add(future);
-                futures.add(future2);
-            }
+        for (int i = 0; i < 1000; i++) {
+            final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+            Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+            Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
+            futures.add(future);
+            futures.add(future2);
+        }
 
-            for (Future<?> future : futures) {
-                future.get();
-            }
-            
-        } finally {
-            executor.shutdownNow();
+        for (Future<?> future : futures) {
+            future.get();
         }
     }
     
@@ -147,7 +146,7 @@ public class AttributeMapTest {
         assertEquals(map.getValue(childSensor), "childValue");
         assertEquals(map.getValue(sensor), "parentValue");
     }
-        
+    
     @Test
     public void testCanStoreChildThenParentSensor() throws Exception {
         AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
@@ -160,6 +159,46 @@ public class AttributeMapTest {
         assertEquals(map.getValue(sensor), "parentValue");
     }
     
+    @Test
+    public void testConcurrentModifyAttributeCalls() throws Exception {
+        AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+        
+        Function<Integer, Maybe<Integer>> modifier = new Function<Integer, Maybe<Integer>>() {
+            @Override public Maybe<Integer> apply(Integer input) {
+                return Maybe.of((input == null) ? 1 : input + 1);
+            }
+        };
+        
+        List<Future<?>> futures = Lists.newArrayList();
+        
+        for (int i = 0; i < 1000; i++) {
+            Future<?> future = executor.submit(newModifyAttributeCallable(map, sensor, modifier));
+            futures.add(future);
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.getValue(sensor), Integer.valueOf(1000));
+    }
+    
+    @Test
+    public void testModifyAttributeReturningAbsentDoesNotEmit() throws Exception {
+        AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+        AttributeSensor<Integer> childSensor = Sensors.newIntegerSensor("a.b", "");
+        
+        final RecordingSensorEventListener listener = new RecordingSensorEventListener();
+        entity.subscribe(entity, sensor, listener);
+        
+        map.modify(childSensor, Functions.constant(Maybe.<Integer>absent()));
+        
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(listener.getEvents().isEmpty(), "events="+listener.getEvents());
+            }});
+    }
+    
     protected <T> Runnable newUpdateMapRunnable(final AttributeMap map, final AttributeSensor<T> attribute, final T val) {
         return new Runnable() {
             @Override public void run() {
@@ -175,4 +214,24 @@ public class AttributeMapTest {
             }
         };
     }
+    
+    protected <T> Callable<T> newModifyAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute, final Function<? super T, Maybe<T>> modifier) {
+        return new Callable<T>() {
+            @Override public T call() {
+                return map.modify(attribute, modifier);
+            }
+        };
+    }
+    
+    public static class RecordingSensorEventListener implements SensorEventListener<Object> {
+        private List<SensorEvent<Object>> events = Collections.synchronizedList(Lists.<SensorEvent<Object>>newArrayList());
+
+        @Override public void onEvent(SensorEvent<Object> event) {
+            events.add(event);
+        }
+        
+        public List<SensorEvent<Object>> getEvents() {
+            return ImmutableList.copyOf(events);
+        }
+    }
 }