You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/12/01 12:01:04 UTC

[2/3] brooklyn-server git commit: Adds maxConcurrentChildCommands parameter to DynamicCluster

Adds maxConcurrentChildCommands parameter to DynamicCluster

The option configures the maximum number of simultaneous Startable
effector invocations that will be made on members of the group.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4e30074c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4e30074c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4e30074c

Branch: refs/heads/master
Commit: 4e30074ca9b09a49c6a4b052d6f28d05608b36eb
Parents: 5b9f896
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Nov 29 16:11:12 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Tue Nov 29 16:11:12 2016 +0000

----------------------------------------------------------------------
 .../apache/brooklyn/core/entity/Entities.java   |   2 +-
 .../brooklyn/entity/group/DynamicCluster.java   |  11 +-
 .../entity/group/DynamicClusterImpl.java        | 153 ++++++++++++++++++-
 .../entity/group/DynamicClusterRebindTest.java  |  54 +++++++
 .../entity/group/DynamicClusterTest.java        | 132 ++++++++++++++++
 5 files changed, 343 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 2821652..69670ec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -625,7 +625,7 @@ public class Entities {
     /**
      * Return all descendants of given entity matching the given predicate and optionally the entity itself.
      * 
-     * @see {@link EntityPredicates} for useful second arguments.
+     * @see EntityPredicates
      */
     @SuppressWarnings("unused")
     public static Iterable<Entity> descendants(Entity root, Predicate<? super Entity> matching, boolean includeSelf) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f2112e8..3f62f82 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -103,7 +103,7 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
             "dynamiccluster.restartMode", 
             "How this cluster should handle restarts; "
             + "by default it is disallowed, but this key can specify a different mode. "
-            + "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'. "
+            + "Modes supported by dynamic cluster are 'off', 'sequential', or 'parallel'. "
             + "However subclasses can define their own modes or may ignore this.", null);
 
     @SetFromFlag("quarantineFailedEntities")
@@ -183,6 +183,15 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
     ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
             "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
 
+    @Beta
+    @SetFromFlag("maxConcurrentChildCommands")
+    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
+            .name("dynamiccluster.maxConcurrentChildCommands")
+            .description("[Beta] The maximum number of effector invocations that will be made on children at once " +
+                    "(e.g. start, stop, restart). Any value null or less than or equal to zero means invocations are unbounded")
+            .defaultValue(0)
+            .build();
+
     AttributeSensor<List<Location>> SUB_LOCATIONS = new BasicAttributeSensor<List<Location>>(
             new TypeToken<List<Location>>() {},
             "dynamiccluster.subLocations", "Locations for each availability zone to use");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 8725b12..4ed0ac0 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -30,10 +30,12 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
@@ -98,6 +100,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * A cluster of entities that can dynamically increase or decrease the number of entities.
@@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
             "next.cluster.member.id", "Returns the ID number of the next member to be added");
 
+    /**
+     * Controls the maximum number of effector invocations the cluster will make on members at once.
+     * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured.
+     */
+    private transient Semaphore childTaskSemaphore;
+
     private volatile FunctionFeed clusterOneAndAllMembersUp;
 
     // TODO better mechanism for arbitrary class name to instance type coercion
@@ -212,9 +222,16 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     public void init() {
         super.init();
         initialiseMemberId();
+        initialiseTaskPermitSemaphore();
         connectAllMembersUp();
     }
 
+    @Override
+    public void rebind() {
+        super.rebind();
+        initialiseTaskPermitSemaphore();
+    }
+
     private void initialiseMemberId() {
         synchronized (mutex) {
             if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
@@ -223,6 +240,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         }
     }
 
+    private void initialiseTaskPermitSemaphore() {
+        synchronized (mutex) {
+            if (getChildTaskSemaphore() == null) {
+                Integer maxChildTasks = config().get(MAX_CONCURRENT_CHILD_COMMANDS);
+                if (maxChildTasks != null && maxChildTasks > 0) {
+                    childTaskSemaphore = new Semaphore(maxChildTasks);
+                }
+            }
+        }
+    }
+
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
                 .entity(this)
@@ -551,8 +579,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
                 Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
         } else if ("parallel".equalsIgnoreCase(mode)) {
             ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
-            DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null, 
-                Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
+            for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))) {
+                DynamicTasks.queue(newThrottledEffectorTask(member, Startable.RESTART, Collections.emptyMap()));
+            }
         } else {
             throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'");
         }
@@ -788,7 +817,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
         // FIXME symmetry in order of added as child, managed, started, and added to group
         final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>) Iterables.filter(removedEntities, Startable.class);
-        Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.<String,Object>emptyMap());
+        ImmutableList.Builder<Task<?>> tasks = ImmutableList.builder();
+        for (Entity member : removedStartables) {
+            tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
+        }
+        Task<?> invoke = Tasks.parallel(tasks.build());
+        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
             invoke.get();
             return removedEntities;
@@ -826,8 +860,11 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
             addedEntities.add(entity);
             addedEntityLocations.put(entity, loc);
             if (entity instanceof Startable) {
+                // First members are used when subsequent members need some attributes from them
+                // before they start; make sure they're in the first batch.
+                boolean privileged = Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER));
                 Map<String, ?> args = ImmutableMap.of("locations", MutableList.builder().addIfNotNull(loc).buildImmutable());
-                Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask();
+                Task<?> task = newThrottledEffectorTask(entity, Startable.START, args, privileged);
                 tasks.put(entity, task);
             }
         }
@@ -1041,14 +1078,116 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     protected void stopAndRemoveNode(Entity member) {
         removeMember(member);
-
         try {
             if (member instanceof Startable) {
-                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
+                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
+                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                 task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);
         }
     }
+
+    @Nullable
+    protected Semaphore getChildTaskSemaphore() {
+        return childTaskSemaphore;
+    }
+
+    /**
+     * @return An unprivileged effector task.
+     * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean)
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
+        return newThrottledEffectorTask(target, effector, arguments, false);
+    }
+
+    /**
+     * Creates tasks that obtain permits from {@link #childTaskSemaphore} before invoking <code>effector</code>
+     * on <code>target</code>. Permits are released in a {@link ListenableFuture#addListener listener}. No
+     * permits are obtained if {@link #childTaskSemaphore} is <code>null</code>.
+     * @param target Entity to invoke effector on
+     * @param effector Effector to invoke on target
+     * @param arguments Effector arguments
+     * @param isPrivileged If true the method obtains a permit from {@link #childTaskSemaphore}
+     *                     immediately and returns the effector invocation task, otherwise it
+     *                     returns a task that sequentially obtains a permit then runs the effector.
+     * @return An unsubmitted task.
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+        final Task<?> toSubmit;
+        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
+        if (getChildTaskSemaphore() != null) {
+            // permitObtained communicates to the release task whether the permit should really be released
+            // or not. ObtainPermit sets it to true when a permit is acquired.
+            final AtomicBoolean permitObtained = new AtomicBoolean();
+            final String description = "Waiting for permit to run " + effector.getName() + " on " + target;
+            final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), description, permitObtained);
+            // Acquire the permit now for the privileged task and just queue the effector invocation.
+            // If it's unprivileged then queue a task to obtain a permit first.
+            if (isPrivileged) {
+                obtain.run();
+                toSubmit = effectorTask;
+            } else {
+                Task<?> obtainMutex = Tasks.builder()
+                        .description(description)
+                        .body(new ObtainPermit(getChildTaskSemaphore(), description, permitObtained))
+                        .build();
+                toSubmit = Tasks.sequential(
+                        "Waiting for permit then running " + effector.getName() + " on " + target,
+                        obtainMutex, effectorTask);
+            }
+            toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), permitObtained), MoreExecutors.sameThreadExecutor());
+        } else {
+            toSubmit = effectorTask;
+        }
+        return toSubmit;
+    }
+
+    private static class ObtainPermit implements Runnable {
+        private final Semaphore permit;
+        private final String description;
+        private final AtomicBoolean hasObtainedPermit;
+
+        private ObtainPermit(Semaphore permit, String description, AtomicBoolean hasObtainedPermit) {
+            this.permit = permit;
+            this.description = description;
+            this.hasObtainedPermit = hasObtainedPermit;
+        }
+
+        @Override
+        public void run() {
+            String oldDetails = Tasks.setBlockingDetails(description);
+            LOG.debug("{} acquiring permit from {}", this, permit);
+            try {
+                permit.acquire();
+                hasObtainedPermit.set(true);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                Tasks.setBlockingDetails(oldDetails);
+            }
+        }
+    }
+
+    private static class ReleasePermit implements Runnable {
+        private final Semaphore permit;
+        private final AtomicBoolean wasPermitObtained;
+
+        private ReleasePermit(Semaphore permit, AtomicBoolean wasPermitObtained) {
+            this.permit = permit;
+            this.wasPermitObtained = wasPermitObtained;
+        }
+
+        @Override
+        public void run() {
+            if (wasPermitObtained.get()) {
+                LOG.debug("{} releasing permit from {}", this, permit);
+                permit.release();
+            } else {
+                LOG.debug("{} not releasing a permit from {} because it appears one was never obtained", this, permit);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
new file mode 100644
index 0000000..bbf3a2a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.entity.group;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
+
+    @Test
+    public void testThrottleAppliesAfterRebind() throws Exception {
+        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class))
+                        .configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger()));
+        app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation()));
+        EntityAsserts.assertAttributeEquals(cluster, DynamicCluster.GROUP_SIZE, 1);
+
+        rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        cluster = Entities.descendants(app(), DynamicCluster.class).iterator().next();
+        cluster.resize(10);
+        EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE, 10);
+        EntityAsserts.assertAttributeEquals(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 36d3c39..c3e7d7f 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -41,12 +42,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
@@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.FailingEntity;
 import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
@@ -67,10 +73,15 @@ import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
@@ -1225,4 +1236,125 @@ public class DynamicClusterTest extends BrooklynAppUnitTestSupport {
         assertEquals(found.size(), expectedNonFirstCount);
     }
 
+    @DataProvider
+    public Object[][] maxConcurrentCommandsTestProvider() {
+        return new Object[][]{{1}, {2}, {3}};
+    }
+
+    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
+    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
+                .configure(DynamicCluster.INITIAL_SIZE, 10)
+                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
+        app.start(ImmutableList.of(app.newSimulatedLocation()));
+        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+    }
+
+    // Tests handling of the first member of a cluster by asserting that a group, whose
+    // other members wait for the first, always starts.
+    @Test
+    public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+
+        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
+                .body(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
+                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
+                        final Entity source = first.get();
+                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
+                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
+                        return booleanTask.get();
+                    }
+                })
+                .build();
+
+        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
+
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
+
+        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
+        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
+
+        // app.start blocks so in the failure case this test would block forever.
+        Asserts.assertReturnsEventually(new Runnable() {
+            public void run() {
+                app.start(ImmutableList.of(app.newSimulatedLocation()));
+                EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+            }
+        }, Asserts.DEFAULT_LONG_TIMEOUT);
+    }
+
+    @Test
+    public void testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission() {
+        // Tests that permits are not released when their start task is cancelled.
+        // Expected behaviour is:
+        // - permit obtained for first member. cancelled task submitted. permit released.
+        // - no permit obtained for second member. cancelled task submitted. no permit released.
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 2)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1));
+        final DynamicClusterImpl clusterImpl = DynamicClusterImpl.class.cast(Entities.deproxy(cluster));
+        assertNotNull(clusterImpl.getChildTaskSemaphore());
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+        try {
+            app.start(ImmutableList.<Location>of(app.newSimulatedLocation()));
+            Asserts.shouldHaveFailedPreviously("Cluster start should have failed because the member start was cancelled");
+        } catch (Exception e) {
+            // ignored.
+        }
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+    }
+
+    @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
+    public interface ThrowOnAsyncStartEntity extends TestEntity {
+        ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1);
+        ConfigKey<AtomicInteger> COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class, "counter");
+        ConfigKey<Boolean> START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch");
+    }
+
+    public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity {
+        private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
+        @Override
+        public void start(Collection<? extends Location> locs) {
+            int count = config().get(COUNTER).incrementAndGet();
+            try {
+                LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)});
+                config().get(START_LATCH);
+                // Throw if more than one entity is starting at the same time as this.
+                assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count + " <= " + config().get(MAX_CONCURRENCY));
+                super.start(locs);
+            } finally {
+                config().get(COUNTER).decrementAndGet();
+            }
+        }
+    }
+
+    /** Used in {@link #testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}. */
+    @ImplementedBy(CancelEffectorInvokeClusterImpl.class)
+    public interface CancelEffectorInvokeCluster extends DynamicCluster {}
+
+    /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to cancel each task before it's submitted. */
+    public static class CancelEffectorInvokeClusterImpl extends DynamicClusterImpl implements CancelEffectorInvokeCluster {
+        @Override
+        protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+            Task<?> unsubmitted = super.newThrottledEffectorTask(target, effector, arguments, isPrivileged);
+            unsubmitted.cancel(true);
+            return unsubmitted;
+        }
+    }
+
 }