You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/12/01 12:01:03 UTC

[1/3] brooklyn-server git commit: Readability

Repository: brooklyn-server
Updated Branches:
  refs/heads/master 23f9a715c -> d1ef42a1e


Readability


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/5b9f8965
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/5b9f8965
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/5b9f8965

Branch: refs/heads/master
Commit: 5b9f89653ee5979665bff445d38750e1d0f234ae
Parents: a569463
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Wed Nov 16 15:36:45 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Fri Nov 18 15:08:31 2016 +0000

----------------------------------------------------------------------
 .../brooklyn/core/effector/Effectors.java       |  8 ++++--
 .../entity/group/DynamicClusterImpl.java        |  3 ++-
 .../brooklyn/util/core/task/ParallelTask.java   | 27 +++++++++++++++-----
 .../brooklyn/util/core/task/SequentialTask.java | 27 ++++++++++++++------
 4 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
index 9b10d1d..6240240 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
@@ -183,14 +183,18 @@ public class Effectors {
     public static TaskAdaptable<List<?>> invocationParallel(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
         List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
         for (Entity e: entities) tasks.add(invocation(e, eff, params));
-        return Tasks.parallel("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
+        return Tasks.parallel(
+                "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())),
+                tasks.toArray(new TaskAdaptable[tasks.size()]));
     }
 
     /** as {@link #invocationParallel(Effector, Map, Iterable)} but executing sequentially */
     public static TaskAdaptable<List<?>> invocationSequential(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
         List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
         for (Entity e: entities) tasks.add(invocation(e, eff, params));
-        return Tasks.sequential("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
+        return Tasks.sequential(
+                "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())),
+                tasks.toArray(new TaskAdaptable[tasks.size()]));
     }
 
     /** returns an unsubmitted task which will invoke the given effector on the given entities

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index eedefa7..8725b12 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -787,7 +787,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         Collection<Entity> removedEntities = pickAndRemoveMembers(delta * -1);
 
         // FIXME symmetry in order of added as child, managed, started, and added to group
-        Task<?> invoke = Entities.invokeEffector(this, (Iterable<Entity>)(Iterable<?>)Iterables.filter(removedEntities, Startable.class), Startable.STOP, Collections.<String,Object>emptyMap());
+        final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>) Iterables.filter(removedEntities, Startable.class);
+        Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.<String,Object>emptyMap());
         try {
             invoke.get();
             return removedEntities;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
index 82ec564..a2425f7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
@@ -38,13 +38,26 @@ import com.google.common.collect.Lists;
  * order they were passed as arguments.
  */
 public class ParallelTask<T> extends CompoundTask<T> {
-    public ParallelTask(Object... tasks) { super(tasks); }
-    
-    public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
-    public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
-    
-    public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
-    public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+
+    public ParallelTask(Object... tasks) {
+        super(tasks);
+    }
+
+    public ParallelTask(Map<String, ?> flags, Collection<?> tasks) {
+        super(flags, tasks);
+    }
+
+    public ParallelTask(Collection<?> tasks) {
+        super(tasks);
+    }
+
+    public ParallelTask(Map<String, ?> flags, Iterable<?> tasks) {
+        super(flags, ImmutableList.copyOf(tasks));
+    }
+
+    public ParallelTask(Iterable<?> tasks) {
+        super(ImmutableList.copyOf(tasks));
+    }
 
     @Override
     protected List<T> runJobs() throws InterruptedException, ExecutionException {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
index 9bd40af..1a9fbac 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
@@ -28,19 +28,30 @@ import org.apache.brooklyn.api.mgmt.Task;
 
 import com.google.common.collect.ImmutableList;
 
-
 /** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
  * (currently is all the return values of individual tasks, but we
  * might want some pipeline support and eventually only to return final value...) */
 public class SequentialTask<T> extends CompoundTask<T> {
 
-    public SequentialTask(Object... tasks) { super(tasks); }
-    
-    public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
-    public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
-    
-    public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
-    public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+    public SequentialTask(Object... tasks) {
+        super(tasks);
+    }
+
+    public SequentialTask(Map<String, ?> flags, Collection<?> tasks) {
+        super(flags, tasks);
+    }
+
+    public SequentialTask(Collection<?> tasks) {
+        super(tasks);
+    }
+
+    public SequentialTask(Map<String, ?> flags, Iterable<?> tasks) {
+        super(flags, ImmutableList.copyOf(tasks));
+    }
+
+    public SequentialTask(Iterable<?> tasks) {
+        super(ImmutableList.copyOf(tasks));
+    }
     
     protected List<T> runJobs() throws InterruptedException, ExecutionException {
         setBlockingDetails("Executing "+


[2/3] brooklyn-server git commit: Adds maxConcurrentChildCommands parameter to DynamicCluster

Posted by sj...@apache.org.
Adds maxConcurrentChildCommands parameter to DynamicCluster

The option configures the maximum number of simultaneous Startable
effector invocations that will be made on members of the group.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4e30074c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4e30074c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4e30074c

Branch: refs/heads/master
Commit: 4e30074ca9b09a49c6a4b052d6f28d05608b36eb
Parents: 5b9f896
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Nov 29 16:11:12 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Tue Nov 29 16:11:12 2016 +0000

----------------------------------------------------------------------
 .../apache/brooklyn/core/entity/Entities.java   |   2 +-
 .../brooklyn/entity/group/DynamicCluster.java   |  11 +-
 .../entity/group/DynamicClusterImpl.java        | 153 ++++++++++++++++++-
 .../entity/group/DynamicClusterRebindTest.java  |  54 +++++++
 .../entity/group/DynamicClusterTest.java        | 132 ++++++++++++++++
 5 files changed, 343 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 2821652..69670ec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -625,7 +625,7 @@ public class Entities {
     /**
      * Return all descendants of given entity matching the given predicate and optionally the entity itself.
      * 
-     * @see {@link EntityPredicates} for useful second arguments.
+     * @see EntityPredicates
      */
     @SuppressWarnings("unused")
     public static Iterable<Entity> descendants(Entity root, Predicate<? super Entity> matching, boolean includeSelf) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f2112e8..3f62f82 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -103,7 +103,7 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
             "dynamiccluster.restartMode", 
             "How this cluster should handle restarts; "
             + "by default it is disallowed, but this key can specify a different mode. "
-            + "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'. "
+            + "Modes supported by dynamic cluster are 'off', 'sequential', or 'parallel'. "
             + "However subclasses can define their own modes or may ignore this.", null);
 
     @SetFromFlag("quarantineFailedEntities")
@@ -183,6 +183,15 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
     ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
             "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
 
+    @Beta
+    @SetFromFlag("maxConcurrentChildCommands")
+    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
+            .name("dynamiccluster.maxConcurrentChildCommands")
+            .description("[Beta] The maximum number of effector invocations that will be made on children at once " +
+                    "(e.g. start, stop, restart). Any value null or less than or equal to zero means invocations are unbounded")
+            .defaultValue(0)
+            .build();
+
     AttributeSensor<List<Location>> SUB_LOCATIONS = new BasicAttributeSensor<List<Location>>(
             new TypeToken<List<Location>>() {},
             "dynamiccluster.subLocations", "Locations for each availability zone to use");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 8725b12..4ed0ac0 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -30,10 +30,12 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.annotation.Nullable;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
@@ -98,6 +100,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * A cluster of entities that can dynamically increase or decrease the number of entities.
@@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
             "next.cluster.member.id", "Returns the ID number of the next member to be added");
 
+    /**
+     * Controls the maximum number of effector invocations the cluster will make on members at once.
+     * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured.
+     */
+    private transient Semaphore childTaskSemaphore;
+
     private volatile FunctionFeed clusterOneAndAllMembersUp;
 
     // TODO better mechanism for arbitrary class name to instance type coercion
@@ -212,9 +222,16 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
     public void init() {
         super.init();
         initialiseMemberId();
+        initialiseTaskPermitSemaphore();
         connectAllMembersUp();
     }
 
+    @Override
+    public void rebind() {
+        super.rebind();
+        initialiseTaskPermitSemaphore();
+    }
+
     private void initialiseMemberId() {
         synchronized (mutex) {
             if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
@@ -223,6 +240,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
         }
     }
 
+    private void initialiseTaskPermitSemaphore() {
+        synchronized (mutex) {
+            if (getChildTaskSemaphore() == null) {
+                Integer maxChildTasks = config().get(MAX_CONCURRENT_CHILD_COMMANDS);
+                if (maxChildTasks != null && maxChildTasks > 0) {
+                    childTaskSemaphore = new Semaphore(maxChildTasks);
+                }
+            }
+        }
+    }
+
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
                 .entity(this)
@@ -551,8 +579,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
                 Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
         } else if ("parallel".equalsIgnoreCase(mode)) {
             ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
-            DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null, 
-                Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
+            for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))) {
+                DynamicTasks.queue(newThrottledEffectorTask(member, Startable.RESTART, Collections.emptyMap()));
+            }
         } else {
             throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'");
         }
@@ -788,7 +817,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
         // FIXME symmetry in order of added as child, managed, started, and added to group
         final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>) Iterables.filter(removedEntities, Startable.class);
-        Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.<String,Object>emptyMap());
+        ImmutableList.Builder<Task<?>> tasks = ImmutableList.builder();
+        for (Entity member : removedStartables) {
+            tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
+        }
+        Task<?> invoke = Tasks.parallel(tasks.build());
+        DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
         try {
             invoke.get();
             return removedEntities;
@@ -826,8 +860,11 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
             addedEntities.add(entity);
             addedEntityLocations.put(entity, loc);
             if (entity instanceof Startable) {
+                // First members are used when subsequent members need some attributes from them
+                // before they start; make sure they're in the first batch.
+                boolean privileged = Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER));
                 Map<String, ?> args = ImmutableMap.of("locations", MutableList.builder().addIfNotNull(loc).buildImmutable());
-                Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask();
+                Task<?> task = newThrottledEffectorTask(entity, Startable.START, args, privileged);
                 tasks.put(entity, task);
             }
         }
@@ -1041,14 +1078,116 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
 
     protected void stopAndRemoveNode(Entity member) {
         removeMember(member);
-
         try {
             if (member instanceof Startable) {
-                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
+                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
+                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                 task.getUnchecked();
             }
         } finally {
             Entities.unmanage(member);
         }
     }
+
+    @Nullable
+    protected Semaphore getChildTaskSemaphore() {
+        return childTaskSemaphore;
+    }
+
+    /**
+     * @return An unprivileged effector task.
+     * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean)
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
+        return newThrottledEffectorTask(target, effector, arguments, false);
+    }
+
+    /**
+     * Creates tasks that obtain permits from {@link #childTaskSemaphore} before invoking <code>effector</code>
+     * on <code>target</code>. Permits are released in a {@link ListenableFuture#addListener listener}. No
+     * permits are obtained if {@link #childTaskSemaphore} is <code>null</code>.
+     * @param target Entity to invoke effector on
+     * @param effector Effector to invoke on target
+     * @param arguments Effector arguments
+     * @param isPrivileged If true the method obtains a permit from {@link #childTaskSemaphore}
+     *                     immediately and returns the effector invocation task, otherwise it
+     *                     returns a task that sequentially obtains a permit then runs the effector.
+     * @return An unsubmitted task.
+     */
+    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+        final Task<?> toSubmit;
+        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
+        if (getChildTaskSemaphore() != null) {
+            // permitObtained communicates to the release task whether the permit should really be released
+            // or not. ObtainPermit sets it to true when a permit is acquired.
+            final AtomicBoolean permitObtained = new AtomicBoolean();
+            final String description = "Waiting for permit to run " + effector.getName() + " on " + target;
+            final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), description, permitObtained);
+            // Acquire the permit now for the privileged task and just queue the effector invocation.
+            // If it's unprivileged then queue a task to obtain a permit first.
+            if (isPrivileged) {
+                obtain.run();
+                toSubmit = effectorTask;
+            } else {
+                Task<?> obtainMutex = Tasks.builder()
+                        .description(description)
+                        .body(new ObtainPermit(getChildTaskSemaphore(), description, permitObtained))
+                        .build();
+                toSubmit = Tasks.sequential(
+                        "Waiting for permit then running " + effector.getName() + " on " + target,
+                        obtainMutex, effectorTask);
+            }
+            toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), permitObtained), MoreExecutors.sameThreadExecutor());
+        } else {
+            toSubmit = effectorTask;
+        }
+        return toSubmit;
+    }
+
+    private static class ObtainPermit implements Runnable {
+        private final Semaphore permit;
+        private final String description;
+        private final AtomicBoolean hasObtainedPermit;
+
+        private ObtainPermit(Semaphore permit, String description, AtomicBoolean hasObtainedPermit) {
+            this.permit = permit;
+            this.description = description;
+            this.hasObtainedPermit = hasObtainedPermit;
+        }
+
+        @Override
+        public void run() {
+            String oldDetails = Tasks.setBlockingDetails(description);
+            LOG.debug("{} acquiring permit from {}", this, permit);
+            try {
+                permit.acquire();
+                hasObtainedPermit.set(true);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            } finally {
+                Tasks.setBlockingDetails(oldDetails);
+            }
+        }
+    }
+
+    private static class ReleasePermit implements Runnable {
+        private final Semaphore permit;
+        private final AtomicBoolean wasPermitObtained;
+
+        private ReleasePermit(Semaphore permit, AtomicBoolean wasPermitObtained) {
+            this.permit = permit;
+            this.wasPermitObtained = wasPermitObtained;
+        }
+
+        @Override
+        public void run() {
+            if (wasPermitObtained.get()) {
+                LOG.debug("{} releasing permit from {}", this, permit);
+                permit.release();
+            } else {
+                LOG.debug("{} not releasing a permit from {} because it appears one was never obtained", this, permit);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
new file mode 100644
index 0000000..bbf3a2a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.entity.group;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
+
+    @Test
+    public void testThrottleAppliesAfterRebind() throws Exception {
+        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class))
+                        .configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger()));
+        app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation()));
+        EntityAsserts.assertAttributeEquals(cluster, DynamicCluster.GROUP_SIZE, 1);
+
+        rebind(RebindOptions.create().terminateOrigManagementContext(true));
+        cluster = Entities.descendants(app(), DynamicCluster.class).iterator().next();
+        cluster.resize(10);
+        EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE, 10);
+        EntityAsserts.assertAttributeEquals(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 36d3c39..c3e7d7f 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -41,12 +42,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAsserts;
@@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.FailingEntity;
 import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
@@ -67,10 +73,15 @@ import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
@@ -1225,4 +1236,125 @@ public class DynamicClusterTest extends BrooklynAppUnitTestSupport {
         assertEquals(found.size(), expectedNonFirstCount);
     }
 
+    @DataProvider
+    public Object[][] maxConcurrentCommandsTestProvider() {
+        return new Object[][]{{1}, {2}, {3}};
+    }
+
+    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
+    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
+                .configure(DynamicCluster.INITIAL_SIZE, 10)
+                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
+        app.start(ImmutableList.of(app.newSimulatedLocation()));
+        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+    }
+
+    // Tests handling of the first member of a cluster by asserting that a group, whose
+    // other members wait for the first, always starts.
+    @Test
+    public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+
+        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
+                .body(new Callable<Boolean>() {
+                    @Override
+                    public Boolean call() throws Exception {
+                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
+                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
+                        final Entity source = first.get();
+                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
+                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
+                        return booleanTask.get();
+                    }
+                })
+                .build();
+
+        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
+
+        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
+
+        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
+        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
+
+        // app.start blocks so in the failure case this test would block forever.
+        Asserts.assertReturnsEventually(new Runnable() {
+            public void run() {
+                app.start(ImmutableList.of(app.newSimulatedLocation()));
+                EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+            }
+        }, Asserts.DEFAULT_LONG_TIMEOUT);
+    }
+
+    @Test
+    public void testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission() {
+        // Tests that permits are not released when their start task is cancelled.
+        // Expected behaviour is:
+        // - permit obtained for first member. cancelled task submitted. permit released.
+        // - no permit obtained for second member. cancelled task submitted. no permit released.
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 2)
+                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1));
+        final DynamicClusterImpl clusterImpl = DynamicClusterImpl.class.cast(Entities.deproxy(cluster));
+        assertNotNull(clusterImpl.getChildTaskSemaphore());
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+        try {
+            app.start(ImmutableList.<Location>of(app.newSimulatedLocation()));
+            Asserts.shouldHaveFailedPreviously("Cluster start should have failed because the member start was cancelled");
+        } catch (Exception e) {
+            // ignored.
+        }
+        assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+    }
+
+    @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
+    public interface ThrowOnAsyncStartEntity extends TestEntity {
+        ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1);
+        ConfigKey<AtomicInteger> COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class, "counter");
+        ConfigKey<Boolean> START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch");
+    }
+
+    public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity {
+        private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
+        @Override
+        public void start(Collection<? extends Location> locs) {
+            int count = config().get(COUNTER).incrementAndGet();
+            try {
+                LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)});
+                config().get(START_LATCH);
+                // Throw if more than one entity is starting at the same time as this.
+                assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count + " <= " + config().get(MAX_CONCURRENCY));
+                super.start(locs);
+            } finally {
+                config().get(COUNTER).decrementAndGet();
+            }
+        }
+    }
+
+    /** Used in {@link #testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}. */
+    @ImplementedBy(CancelEffectorInvokeClusterImpl.class)
+    public interface CancelEffectorInvokeCluster extends DynamicCluster {}
+
+    /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to cancel each task before it's submitted. */
+    public static class CancelEffectorInvokeClusterImpl extends DynamicClusterImpl implements CancelEffectorInvokeCluster {
+        @Override
+        protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+            Task<?> unsubmitted = super.newThrottledEffectorTask(target, effector, arguments, isPrivileged);
+            unsubmitted.cancel(true);
+            return unsubmitted;
+        }
+    }
+
 }


[3/3] brooklyn-server git commit: This closes #443

Posted by sj...@apache.org.
This closes #443

Adds maxConcurrentChildCommands parameter to DynamicCluster


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d1ef42a1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d1ef42a1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d1ef42a1

Branch: refs/heads/master
Commit: d1ef42a1e004cde87fe9bd7f1fda144b0fcf0a5d
Parents: 23f9a71 4e30074
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Thu Dec 1 12:00:29 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Thu Dec 1 12:00:29 2016 +0000

----------------------------------------------------------------------
 .../brooklyn/core/effector/Effectors.java       |   8 +-
 .../apache/brooklyn/core/entity/Entities.java   |   2 +-
 .../brooklyn/entity/group/DynamicCluster.java   |  11 +-
 .../entity/group/DynamicClusterImpl.java        | 154 ++++++++++++++++++-
 .../brooklyn/util/core/task/ParallelTask.java   |  27 +++-
 .../brooklyn/util/core/task/SequentialTask.java |  27 +++-
 .../entity/group/DynamicClusterRebindTest.java  |  54 +++++++
 .../entity/group/DynamicClusterTest.java        | 132 ++++++++++++++++
 8 files changed, 389 insertions(+), 26 deletions(-)
----------------------------------------------------------------------