You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/12/17 14:04:31 UTC
[1/2] incubator-brooklyn git commit: Fix concurrent use of
entity.setAttribute()
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master a5f359f04 -> cde2ad5ab
Fix concurrent use of entity.setAttribute()
Previously we were using a vanilla LinkedHashMap for storing attributes
(ever since the FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE
was disabled by default!)
Now uses ConcurrentMap, or for some other things a synchronised set
so that we preserve order with the underlying LinkedHashSet.
Adds EntityConcurrencyTest for concurrently:
- setting attributes
- setting config
- adding tags
- adding groups
- adding children
- adding locations
- adding policies
- adding enrichers
- adding feeds
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/29ab8e29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/29ab8e29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/29ab8e29
Branch: refs/heads/master
Commit: 29ab8e298ee6a50c29f54f8f6cf57490a7703d17
Parents: a5f359f
Author: Aled Sage <al...@gmail.com>
Authored: Tue Dec 15 21:41:29 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Dec 17 13:03:06 2015 +0000
----------------------------------------------------------------------
.../brooklyn/core/entity/AbstractEntity.java | 34 ++-
.../core/entity/internal/EntityConfigMap.java | 21 +-
.../brooklyn/core/sensor/AttributeMap.java | 30 +-
.../brooklyn/core/entity/AttributeMapTest.java | 20 ++
.../core/entity/EntityConcurrencyTest.java | 275 +++++++++++++++++++
5 files changed, 355 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/29ab8e29/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 36b425f..0599373 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -44,7 +44,6 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
import org.apache.brooklyn.api.mgmt.rebind.mementos.EntityMemento;
import org.apache.brooklyn.api.objs.EntityAdjunct;
-import org.apache.brooklyn.api.objs.SpecParameter;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.policy.PolicySpec;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -96,7 +95,6 @@ import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
import org.apache.brooklyn.util.guava.Maybe;
-import org.apache.brooklyn.util.guava.TypeTokens;
import org.apache.brooklyn.util.javalang.Equals;
import org.apache.brooklyn.util.text.Strings;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -199,18 +197,20 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
private Entity selfProxy;
private volatile Application application;
- // TODO Because some things still don't use EntitySpec (e.g. the EntityFactory stuff for cluster/fabric),
- // then we need temp vals here. When setManagementContext is called, we'll switch these out for the read-deal;
- // i.e. for the values backed by storage
+ // If FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, then these are just temporary values
+ // (but may still be needed if something, such as an EntityFactory in a cluster/fabric, did not
+ // use EntitySpec.
+ // If that feature is disabled, then these are not "temporary" values - these are the production
+ // values. They must be thread-safe, and where necessary (e.g. group) they should preserve order
+ // if possible.
private Reference<Entity> parent = new BasicReference<Entity>();
- private Set<Group> groupsInternal = Sets.newLinkedHashSet();
- private Set<Entity> children = Sets.newLinkedHashSet();
+ private Set<Group> groupsInternal = Collections.synchronizedSet(Sets.<Group>newLinkedHashSet());
+ private Set<Entity> children = Collections.synchronizedSet(Sets.<Entity>newLinkedHashSet());
private Reference<List<Location>> locations = new BasicReference<List<Location>>(ImmutableList.<Location>of()); // dups removed in addLocations
private Reference<Long> creationTimeUtc = new BasicReference<Long>(System.currentTimeMillis());
private Reference<String> displayName = new BasicReference<String>();
private Reference<String> iconUrl = new BasicReference<String>();
- Map<String,Object> presentationAttributes = Maps.newLinkedHashMap();
private Collection<AbstractPolicy> policiesInternal = Lists.newCopyOnWriteArrayList();
private Collection<AbstractEnricher> enrichersInternal = Lists.newCopyOnWriteArrayList();
Collection<Feed> feeds = Lists.newCopyOnWriteArrayList();
@@ -245,15 +245,15 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
* The config values of this entity. Updating this map should be done
* via getConfig/setConfig.
*/
- // TODO Assigning temp value because not everything uses EntitySpec; see setManagementContext()
- private EntityConfigMap configsInternal = new EntityConfigMap(this, Maps.<ConfigKey<?>, Object>newLinkedHashMap());
+ // If FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, this value will be only temporary.
+ private EntityConfigMap configsInternal = new EntityConfigMap(this);
/**
* The sensor-attribute values of this entity. Updating this map should be done
* via getAttribute/setAttribute; it will automatically emit an attribute-change event.
*/
- // TODO Assigning temp value because not everything uses EntitySpec; see setManagementContext()
- private AttributeMap attributesInternal = new AttributeMap(this, Maps.<Collection<String>, Object>newLinkedHashMap());
+ // If FEATURE_USE_BROOKLYN_LIVE_OBJECTS_DATAGRID_STORAGE, this value will be only temporary.
+ private AttributeMap attributesInternal = new AttributeMap(this);
/**
* For temporary data, e.g. timestamps etc for calculating real attribute values, such as when
@@ -743,8 +743,10 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
return asList().isEmpty();
}
- protected List<Group> asList() {
- return ImmutableList.copyOf(groupsInternal);
+ protected List<Group> asList() {
+ synchronized (groupsInternal) {
+ return ImmutableList.copyOf(groupsInternal);
+ }
}
@Override
@@ -802,7 +804,9 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
@Override
public Collection<Entity> getChildren() {
- return ImmutableList.copyOf(children);
+ synchronized (children) {
+ return ImmutableList.copyOf(children);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/29ab8e29/core/src/main/java/org/apache/brooklyn/core/entity/internal/EntityConfigMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/internal/EntityConfigMap.java b/core/src/main/java/org/apache/brooklyn/core/entity/internal/EntityConfigMap.java
index 111eee0..da209e1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/internal/EntityConfigMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/internal/EntityConfigMap.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.core.entity.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.util.groovy.GroovyJavaMethods.elvis;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -68,6 +69,12 @@ public class EntityConfigMap extends AbstractConfigMapImpl {
private final ConfigBag localConfigBag;
private final ConfigBag inheritedConfigBag;
+ public EntityConfigMap(AbstractEntity entity) {
+ // Not using ConcurrentMap, because want to (continue to) allow null values.
+ // Could use ConcurrentMapAcceptingNullVals (with the associated performance hit on entrySet() etc).
+ this(entity, Collections.synchronizedMap(Maps.<ConfigKey<?>, Object>newLinkedHashMap()));
+ }
+
public EntityConfigMap(AbstractEntity entity, Map<ConfigKey<?>, Object> storage) {
this.entity = checkNotNull(entity, "entity must be specified");
this.ownConfig = checkNotNull(storage, "storage map must be specified");
@@ -292,15 +299,21 @@ public class EntityConfigMap extends AbstractConfigMapImpl {
for (Map.Entry<ConfigKey<?>,Object> entry: inheritedConfig.entrySet())
if (filter.apply(entry.getKey()))
m.inheritedConfig.put(entry.getKey(), entry.getValue());
- for (Map.Entry<ConfigKey<?>,Object> entry: ownConfig.entrySet())
- if (filter.apply(entry.getKey()))
- m.ownConfig.put(entry.getKey(), entry.getValue());
+ synchronized (ownConfig) {
+ for (Map.Entry<ConfigKey<?>,Object> entry: ownConfig.entrySet())
+ if (filter.apply(entry.getKey()))
+ m.ownConfig.put(entry.getKey(), entry.getValue());
+ }
return m;
}
@Override
public String toString() {
- return super.toString()+"[own="+Sanitizer.sanitize(ownConfig)+"; inherited="+Sanitizer.sanitize(inheritedConfig)+"]";
+ Map<ConfigKey<?>, Object> sanitizeConfig;
+ synchronized (ownConfig) {
+ sanitizeConfig = Sanitizer.sanitize(ownConfig);
+ }
+ return super.toString()+"[own="+sanitizeConfig+"; inherited="+Sanitizer.sanitize(inheritedConfig)+"]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/29ab8e29/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
index 72d6d23..75f087e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.core.sensor;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import org.apache.brooklyn.api.entity.Entity;
@@ -59,7 +60,20 @@ public final class AttributeMap {
* Creates a new AttributeMap.
*
* @param entity the EntityLocal this AttributeMap belongs to.
- * @throws IllegalArgumentException if entity is null
+ * @throws NullPointerException if entity is null
+ */
+ public AttributeMap(AbstractEntity entity) {
+ // Not using ConcurrentMap, because want to (continue to) allow null values.
+ // Could use ConcurrentMapAcceptingNullVals (with the associated performance hit on entrySet() etc).
+ this(entity, Collections.synchronizedMap(Maps.<Collection<String>, Object>newLinkedHashMap()));
+ }
+
+ /**
+ * Creates a new AttributeMap.
+ *
+ * @param entity the EntityLocal this AttributeMap belongs to.
+ * @param storage the Map in which to store the values - should be concurrent or synchronized.
+ * @throws NullPointerException if entity is null
*/
public AttributeMap(AbstractEntity entity, Map<Collection<String>, Object> storage) {
this.entity = checkNotNull(entity, "entity must be specified");
@@ -67,15 +81,19 @@ public final class AttributeMap {
}
public Map<Collection<String>, Object> asRawMap() {
- return ImmutableMap.copyOf(values);
+ synchronized (values) {
+ return ImmutableMap.copyOf(values);
+ }
}
public Map<String, Object> asMap() {
Map<String, Object> result = Maps.newLinkedHashMap();
- for (Map.Entry<Collection<String>, Object> entry : values.entrySet()) {
- String sensorName = Joiner.on('.').join(entry.getKey());
- Object val = (isNull(entry.getValue())) ? null : entry.getValue();
- result.put(sensorName, val);
+ synchronized (values) {
+ for (Map.Entry<Collection<String>, Object> entry : values.entrySet()) {
+ String sensorName = Joiner.on('.').join(entry.getKey());
+ Object val = (isNull(entry.getValue())) ? null : entry.getValue();
+ result.put(sensorName, val);
+ }
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/29ab8e29/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
index c1ae306..77ba9c6 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
@@ -108,6 +108,26 @@ public class AttributeMapTest {
}
@Test
+ public void testConcurrentUpdatesAllApplied() throws Exception {
+ List<Future<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
+ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, i));
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
+ assertEquals(map.getValue(nextSensor), (Integer)i);
+ }
+ }
+
+ @Test
public void testStoredSensorsCanBeRetrieved() throws Exception {
AttributeSensor<String> sensor1 = Sensors.newStringSensor("a", "");
AttributeSensor<String> sensor2 = Sensors.newStringSensor("b.c", "");
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/29ab8e29/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
new file mode 100644
index 0000000..f606226
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.entity;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.BasicEnricherTest;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.location.SimulatedLocation;
+import org.apache.brooklyn.core.policy.basic.BasicPolicyTest;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class EntityConcurrencyTest extends BrooklynAppUnitTestSupport {
+ TestEntity entity;
+ ListeningExecutorService executor;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ entity = app.addChild(EntitySpec.create(TestEntity.class));
+ executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (executor != null) executor.shutdownNow();
+ }
+
+ @Test
+ public void testConcurrentSetAttribute() throws Exception {
+ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
+ final int val = i;
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.sensors().set(nextSensor, val);
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
+ assertEquals(entity.sensors().get(nextSensor), (Integer)i, "i="+i);
+ }
+ }
+
+ @Test
+ public void testConcurrentSetConfig() throws Exception {
+ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
+ final int val = i;
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.config().set(nextKey, val);
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
+ assertEquals(entity.config().get(nextKey), (Integer)i, "i="+i);
+ }
+ }
+
+ @Test
+ public void testConcurrentAddTag() throws Exception {
+ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+ List<Integer> tags = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final int val = i;
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.tags().addTag(val);
+ }});
+ futures.add(future);
+ tags.add(val);
+ }
+
+ Futures.allAsList(futures).get();
+
+ Asserts.assertEqualsIgnoringOrder(entity.tags().getTags(), tags);
+ }
+
+ @Test
+ public void testConcurrentAddGroup() throws Exception {
+ final int NUM_TASKS = 100;
+
+ List<BasicGroup> groups = Lists.newArrayList();
+ for (int i = 0; i < NUM_TASKS; i++) {
+ groups.add(app.addChild(EntitySpec.create(BasicGroup.class)));
+ }
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (final BasicGroup group : groups) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ group.addMember(entity);
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ Asserts.assertEqualsIgnoringOrder(entity.groups(), groups);
+ }
+
+ @Test
+ public void testConcurrentAddChild() throws Exception {
+ final int NUM_TASKS = 100;
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.addChild(EntitySpec.create(BasicEntity.class));
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ assertEquals(entity.getChildren().size(), NUM_TASKS);
+ Asserts.assertEqualsIgnoringOrder(entity.getChildren(), mgmt.getEntityManager().findEntities(Predicates.instanceOf(BasicEntity.class)));
+ }
+
+ @Test
+ public void testConcurrentAddLocation() throws Exception {
+ final int NUM_TASKS = 100;
+
+ List<Location> locs = Lists.newArrayList();
+ for (int i = 0; i < NUM_TASKS; i++) {
+ locs.add(mgmt.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)));
+ }
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (final Location loc : locs) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.addLocations(ImmutableList.of(loc));
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ Asserts.assertEqualsIgnoringOrder(entity.getLocations(), locs);
+ }
+
+ @Test
+ public void testConcurrentAddPolicy() throws Exception {
+ final int NUM_TASKS = 100;
+
+ int numPrePolicies = entity.policies().size();
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.policies().add(PolicySpec.create(BasicPolicyTest.MyPolicy.class));
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ assertEquals(entity.policies().size(), NUM_TASKS+numPrePolicies);
+ }
+
+ @Test
+ public void testConcurrentAddEnricher() throws Exception {
+ final int NUM_TASKS = 100;
+
+ int numPreEnrichers = entity.enrichers().size();
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.enrichers().add(EnricherSpec.create(BasicEnricherTest.MyEnricher.class));
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ assertEquals(entity.enrichers().size(), NUM_TASKS+numPreEnrichers);
+ }
+
+ @Test
+ public void testConcurrentAddFeed() throws Exception {
+ final int NUM_TASKS = 100;
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ ListenableFuture<?> future = executor.submit(new Runnable() {
+ @Override public void run() {
+ entity.feeds().addFeed(new MyFeed());
+ }});
+ futures.add(future);
+ }
+
+ Futures.allAsList(futures).get();
+
+ assertEquals(entity.feeds().getFeeds().size(), NUM_TASKS);
+ }
+ private static class MyFeed extends AbstractFeed {
+ }
+}
[2/2] incubator-brooklyn git commit: This closes #1110
Posted by al...@apache.org.
This closes #1110
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/cde2ad5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/cde2ad5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/cde2ad5a
Branch: refs/heads/master
Commit: cde2ad5abea0dcb741573d30c0598a5b93f734c8
Parents: a5f359f 29ab8e2
Author: Aled Sage <al...@gmail.com>
Authored: Thu Dec 17 13:04:15 2015 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Thu Dec 17 13:04:15 2015 +0000
----------------------------------------------------------------------
.../brooklyn/core/entity/AbstractEntity.java | 34 ++-
.../core/entity/internal/EntityConfigMap.java | 21 +-
.../brooklyn/core/sensor/AttributeMap.java | 30 +-
.../brooklyn/core/entity/AttributeMapTest.java | 20 ++
.../core/entity/EntityConcurrencyTest.java | 275 +++++++++++++++++++
5 files changed, 355 insertions(+), 25 deletions(-)
----------------------------------------------------------------------