You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by sj...@apache.org on 2016/12/01 12:01:03 UTC
[1/3] brooklyn-server git commit: Readability
Repository: brooklyn-server
Updated Branches:
refs/heads/master 23f9a715c -> d1ef42a1e
Readability
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/5b9f8965
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/5b9f8965
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/5b9f8965
Branch: refs/heads/master
Commit: 5b9f89653ee5979665bff445d38750e1d0f234ae
Parents: a569463
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Wed Nov 16 15:36:45 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Fri Nov 18 15:08:31 2016 +0000
----------------------------------------------------------------------
.../brooklyn/core/effector/Effectors.java | 8 ++++--
.../entity/group/DynamicClusterImpl.java | 3 ++-
.../brooklyn/util/core/task/ParallelTask.java | 27 +++++++++++++++-----
.../brooklyn/util/core/task/SequentialTask.java | 27 ++++++++++++++------
4 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
index 9b10d1d..6240240 100644
--- a/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
+++ b/core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java
@@ -183,14 +183,18 @@ public class Effectors {
public static TaskAdaptable<List<?>> invocationParallel(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
for (Entity e: entities) tasks.add(invocation(e, eff, params));
- return Tasks.parallel("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
+ return Tasks.parallel(
+ "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())),
+ tasks.toArray(new TaskAdaptable[tasks.size()]));
}
/** as {@link #invocationParallel(Effector, Map, Iterable)} but executing sequentially */
public static TaskAdaptable<List<?>> invocationSequential(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
for (Entity e: entities) tasks.add(invocation(e, eff, params));
- return Tasks.sequential("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
+ return Tasks.sequential(
+ "invoking " + eff + " on " + tasks.size() + " node" + (Strings.s(tasks.size())),
+ tasks.toArray(new TaskAdaptable[tasks.size()]));
}
/** returns an unsubmitted task which will invoke the given effector on the given entities
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index eedefa7..8725b12 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -787,7 +787,8 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
Collection<Entity> removedEntities = pickAndRemoveMembers(delta * -1);
// FIXME symmetry in order of added as child, managed, started, and added to group
- Task<?> invoke = Entities.invokeEffector(this, (Iterable<Entity>)(Iterable<?>)Iterables.filter(removedEntities, Startable.class), Startable.STOP, Collections.<String,Object>emptyMap());
+ final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>) Iterables.filter(removedEntities, Startable.class);
+ Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.<String,Object>emptyMap());
try {
invoke.get();
return removedEntities;
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
index 82ec564..a2425f7 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ParallelTask.java
@@ -38,13 +38,26 @@ import com.google.common.collect.Lists;
* order they were passed as arguments.
*/
public class ParallelTask<T> extends CompoundTask<T> {
- public ParallelTask(Object... tasks) { super(tasks); }
-
- public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
- public ParallelTask(Collection<? extends Object> tasks) { super(tasks); }
-
- public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
- public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+
+ public ParallelTask(Object... tasks) {
+ super(tasks);
+ }
+
+ public ParallelTask(Map<String, ?> flags, Collection<?> tasks) {
+ super(flags, tasks);
+ }
+
+ public ParallelTask(Collection<?> tasks) {
+ super(tasks);
+ }
+
+ public ParallelTask(Map<String, ?> flags, Iterable<?> tasks) {
+ super(flags, ImmutableList.copyOf(tasks));
+ }
+
+ public ParallelTask(Iterable<?> tasks) {
+ super(ImmutableList.copyOf(tasks));
+ }
@Override
protected List<T> runJobs() throws InterruptedException, ExecutionException {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5b9f8965/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
index 9bd40af..1a9fbac 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/SequentialTask.java
@@ -28,19 +28,30 @@ import org.apache.brooklyn.api.mgmt.Task;
import com.google.common.collect.ImmutableList;
-
/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD;
* (currently is all the return values of individual tasks, but we
* might want some pipeline support and eventually only to return final value...) */
public class SequentialTask<T> extends CompoundTask<T> {
- public SequentialTask(Object... tasks) { super(tasks); }
-
- public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); }
- public SequentialTask(Collection<? extends Object> tasks) { super(tasks); }
-
- public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); }
- public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); }
+ public SequentialTask(Object... tasks) {
+ super(tasks);
+ }
+
+ public SequentialTask(Map<String, ?> flags, Collection<?> tasks) {
+ super(flags, tasks);
+ }
+
+ public SequentialTask(Collection<?> tasks) {
+ super(tasks);
+ }
+
+ public SequentialTask(Map<String, ?> flags, Iterable<?> tasks) {
+ super(flags, ImmutableList.copyOf(tasks));
+ }
+
+ public SequentialTask(Iterable<?> tasks) {
+ super(ImmutableList.copyOf(tasks));
+ }
protected List<T> runJobs() throws InterruptedException, ExecutionException {
setBlockingDetails("Executing "+
[2/3] brooklyn-server git commit: Adds maxConcurrentChildCommands
parameter to DynamicCluster
Posted by sj...@apache.org.
Adds maxConcurrentChildCommands parameter to DynamicCluster
The option configures the maximum number of simultaneous Startable
effector invocations that will be made on members of the group.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4e30074c
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4e30074c
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4e30074c
Branch: refs/heads/master
Commit: 4e30074ca9b09a49c6a4b052d6f28d05608b36eb
Parents: 5b9f896
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Tue Nov 29 16:11:12 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Tue Nov 29 16:11:12 2016 +0000
----------------------------------------------------------------------
.../apache/brooklyn/core/entity/Entities.java | 2 +-
.../brooklyn/entity/group/DynamicCluster.java | 11 +-
.../entity/group/DynamicClusterImpl.java | 153 ++++++++++++++++++-
.../entity/group/DynamicClusterRebindTest.java | 54 +++++++
.../entity/group/DynamicClusterTest.java | 132 ++++++++++++++++
5 files changed, 343 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
index 2821652..69670ec 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/Entities.java
@@ -625,7 +625,7 @@ public class Entities {
/**
* Return all descendants of given entity matching the given predicate and optionally the entity itself.
*
- * @see {@link EntityPredicates} for useful second arguments.
+ * @see EntityPredicates
*/
@SuppressWarnings("unused")
public static Iterable<Entity> descendants(Entity root, Predicate<? super Entity> matching, boolean includeSelf) {
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f2112e8..3f62f82 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -103,7 +103,7 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
"dynamiccluster.restartMode",
"How this cluster should handle restarts; "
+ "by default it is disallowed, but this key can specify a different mode. "
- + "Modes supported by dynamic cluster are 'off', 'sequqential', or 'parallel'. "
+ + "Modes supported by dynamic cluster are 'off', 'sequential', or 'parallel'. "
+ "However subclasses can define their own modes or may ignore this.", null);
@SetFromFlag("quarantineFailedEntities")
@@ -183,6 +183,15 @@ public interface DynamicCluster extends AbstractGroup, Cluster, MemberReplaceabl
ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
"cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
+ @Beta
+ @SetFromFlag("maxConcurrentChildCommands")
+ ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
+ .name("dynamiccluster.maxConcurrentChildCommands")
+ .description("[Beta] The maximum number of effector invocations that will be made on children at once " +
+ "(e.g. start, stop, restart). Any value null or less than or equal to zero means invocations are unbounded")
+ .defaultValue(0)
+ .build();
+
AttributeSensor<List<Location>> SUB_LOCATIONS = new BasicAttributeSensor<List<Location>>(
new TypeToken<List<Location>>() {},
"dynamiccluster.subLocations", "Locations for each availability zone to use");
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 8725b12..4ed0ac0 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -30,10 +30,12 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import javax.annotation.Nullable;
+import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.entity.Group;
@@ -98,6 +100,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* A cluster of entities that can dynamically increase or decrease the number of entities.
@@ -108,6 +112,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
"next.cluster.member.id", "Returns the ID number of the next member to be added");
+ /**
+ * Controls the maximum number of effector invocations the cluster will make on members at once.
+ * Only used if {@link #MAX_CONCURRENT_CHILD_COMMANDS} is configured.
+ */
+ private transient Semaphore childTaskSemaphore;
+
private volatile FunctionFeed clusterOneAndAllMembersUp;
// TODO better mechanism for arbitrary class name to instance type coercion
@@ -212,9 +222,16 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
public void init() {
super.init();
initialiseMemberId();
+ initialiseTaskPermitSemaphore();
connectAllMembersUp();
}
+ @Override
+ public void rebind() {
+ super.rebind();
+ initialiseTaskPermitSemaphore();
+ }
+
private void initialiseMemberId() {
synchronized (mutex) {
if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
@@ -223,6 +240,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
}
}
+ private void initialiseTaskPermitSemaphore() {
+ synchronized (mutex) {
+ if (getChildTaskSemaphore() == null) {
+ Integer maxChildTasks = config().get(MAX_CONCURRENT_CHILD_COMMANDS);
+ if (maxChildTasks != null && maxChildTasks > 0) {
+ childTaskSemaphore = new Semaphore(maxChildTasks);
+ }
+ }
+ }
+ }
+
private void connectAllMembersUp() {
clusterOneAndAllMembersUp = FunctionFeed.builder()
.entity(this)
@@ -551,8 +579,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
} else if ("parallel".equalsIgnoreCase(mode)) {
ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
- DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null,
- Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
+ for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))) {
+ DynamicTasks.queue(newThrottledEffectorTask(member, Startable.RESTART, Collections.emptyMap()));
+ }
} else {
throw new IllegalArgumentException("Unknown "+RESTART_MODE.getName()+" '"+mode+"'");
}
@@ -788,7 +817,12 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
// FIXME symmetry in order of added as child, managed, started, and added to group
final Iterable<Entity> removedStartables = (Iterable<Entity>) (Iterable<?>) Iterables.filter(removedEntities, Startable.class);
- Task<?> invoke = Entities.invokeEffector(this, removedStartables, Startable.STOP, Collections.<String,Object>emptyMap());
+ ImmutableList.Builder<Task<?>> tasks = ImmutableList.builder();
+ for (Entity member : removedStartables) {
+ tasks.add(newThrottledEffectorTask(member, Startable.STOP, Collections.emptyMap()));
+ }
+ Task<?> invoke = Tasks.parallel(tasks.build());
+ DynamicTasks.queueIfPossible(invoke).orSubmitAsync();
try {
invoke.get();
return removedEntities;
@@ -826,8 +860,11 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
addedEntities.add(entity);
addedEntityLocations.put(entity, loc);
if (entity instanceof Startable) {
+ // First members are used when subsequent members need some attributes from them
+ // before they start; make sure they're in the first batch.
+ boolean privileged = Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER));
Map<String, ?> args = ImmutableMap.of("locations", MutableList.builder().addIfNotNull(loc).buildImmutable());
- Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask();
+ Task<?> task = newThrottledEffectorTask(entity, Startable.START, args, privileged);
tasks.put(entity, task);
}
}
@@ -1041,14 +1078,116 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus
protected void stopAndRemoveNode(Entity member) {
removeMember(member);
-
try {
if (member instanceof Startable) {
- Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
+ Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
+ DynamicTasks.queueIfPossible(task).orSubmitAsync();
task.getUnchecked();
}
} finally {
Entities.unmanage(member);
}
}
+
+ @Nullable
+ protected Semaphore getChildTaskSemaphore() {
+ return childTaskSemaphore;
+ }
+
+ /**
+ * @return An unprivileged effector task.
+ * @see #newThrottledEffectorTask(Entity, Effector, Map, boolean)
+ */
+ protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
+ return newThrottledEffectorTask(target, effector, arguments, false);
+ }
+
+ /**
+ * Creates tasks that obtain permits from {@link #childTaskSemaphore} before invoking <code>effector</code>
+ * on <code>target</code>. Permits are released in a {@link ListenableFuture#addListener listener}. No
+ * permits are obtained if {@link #childTaskSemaphore} is <code>null</code>.
+ * @param target Entity to invoke effector on
+ * @param effector Effector to invoke on target
+ * @param arguments Effector arguments
+ * @param isPrivileged If true the method obtains a permit from {@link #childTaskSemaphore}
+ * immediately and returns the effector invocation task, otherwise it
+ * returns a task that sequentially obtains a permit then runs the effector.
+ * @return An unsubmitted task.
+ */
+ protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+ final Task<?> toSubmit;
+ final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
+ if (getChildTaskSemaphore() != null) {
+ // permitObtained communicates to the release task whether the permit should really be released
+ // or not. ObtainPermit sets it to true when a permit is acquired.
+ final AtomicBoolean permitObtained = new AtomicBoolean();
+ final String description = "Waiting for permit to run " + effector.getName() + " on " + target;
+ final Runnable obtain = new ObtainPermit(getChildTaskSemaphore(), description, permitObtained);
+ // Acquire the permit now for the privileged task and just queue the effector invocation.
+ // If it's unprivileged then queue a task to obtain a permit first.
+ if (isPrivileged) {
+ obtain.run();
+ toSubmit = effectorTask;
+ } else {
+ Task<?> obtainMutex = Tasks.builder()
+ .description(description)
+ .body(new ObtainPermit(getChildTaskSemaphore(), description, permitObtained))
+ .build();
+ toSubmit = Tasks.sequential(
+ "Waiting for permit then running " + effector.getName() + " on " + target,
+ obtainMutex, effectorTask);
+ }
+ toSubmit.addListener(new ReleasePermit(getChildTaskSemaphore(), permitObtained), MoreExecutors.sameThreadExecutor());
+ } else {
+ toSubmit = effectorTask;
+ }
+ return toSubmit;
+ }
+
+ private static class ObtainPermit implements Runnable {
+ private final Semaphore permit;
+ private final String description;
+ private final AtomicBoolean hasObtainedPermit;
+
+ private ObtainPermit(Semaphore permit, String description, AtomicBoolean hasObtainedPermit) {
+ this.permit = permit;
+ this.description = description;
+ this.hasObtainedPermit = hasObtainedPermit;
+ }
+
+ @Override
+ public void run() {
+ String oldDetails = Tasks.setBlockingDetails(description);
+ LOG.debug("{} acquiring permit from {}", this, permit);
+ try {
+ permit.acquire();
+ hasObtainedPermit.set(true);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ Tasks.setBlockingDetails(oldDetails);
+ }
+ }
+ }
+
+ private static class ReleasePermit implements Runnable {
+ private final Semaphore permit;
+ private final AtomicBoolean wasPermitObtained;
+
+ private ReleasePermit(Semaphore permit, AtomicBoolean wasPermitObtained) {
+ this.permit = permit;
+ this.wasPermitObtained = wasPermitObtained;
+ }
+
+ @Override
+ public void run() {
+ if (wasPermitObtained.get()) {
+ LOG.debug("{} releasing permit from {}", this, permit);
+ permit.release();
+ } else {
+ LOG.debug("{} not releasing a permit from {} because it appears one was never obtained", this, permit);
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
new file mode 100644
index 0000000..bbf3a2a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.brooklyn.entity.group;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
+
+ @Test
+ public void testThrottleAppliesAfterRebind() throws Exception {
+ DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+ .configure(DynamicCluster.INITIAL_SIZE, 1)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(DynamicClusterTest.ThrowOnAsyncStartEntity.class))
+ .configure(DynamicClusterTest.ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger()));
+ app().start(ImmutableList.of(origApp.newLocalhostProvisioningLocation()));
+ EntityAsserts.assertAttributeEquals(cluster, DynamicCluster.GROUP_SIZE, 1);
+
+ rebind(RebindOptions.create().terminateOrigManagementContext(true));
+ cluster = Entities.descendants(app(), DynamicCluster.class).iterator().next();
+ cluster.resize(10);
+ EntityAsserts.assertAttributeEqualsEventually(cluster, DynamicCluster.GROUP_SIZE, 10);
+ EntityAsserts.assertAttributeEquals(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4e30074c/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index 36d3c39..c3e7d7f 100644
--- a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -41,12 +42,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.brooklyn.api.effector.Effector;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
@@ -59,6 +64,7 @@ import org.apache.brooklyn.core.entity.trait.FailingEntity;
import org.apache.brooklyn.core.entity.trait.Resizable;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.core.test.entity.TestEntityImpl;
@@ -67,10 +73,15 @@ import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.base.Function;
@@ -1225,4 +1236,125 @@ public class DynamicClusterTest extends BrooklynAppUnitTestSupport {
assertEquals(found.size(), expectedNonFirstCount);
}
+ @DataProvider
+ public Object[][] maxConcurrentCommandsTestProvider() {
+ return new Object[][]{{1}, {2}, {3}};
+ }
+
+ @Test(dataProvider = "maxConcurrentCommandsTestProvider")
+ public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
+ EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+ .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
+ .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
+ DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
+ .configure(DynamicCluster.INITIAL_SIZE, 10)
+ .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
+ app.start(ImmutableList.of(app.newSimulatedLocation()));
+ assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+ }
+
+ // Tests handling of the first member of a cluster by asserting that a group, whose
+ // other members wait for the first, always starts.
+ @Test
+ public void testFirstMemberInFirstBatchWhenMaxConcurrentCommandsSet() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
+ .configure(DynamicCluster.INITIAL_SIZE, 3));
+
+ Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
+ .body(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
+ DynamicTasks.queueIfPossible(first).orSubmitAsync();
+ final Entity source = first.get();
+ final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
+ DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
+ return booleanTask.get();
+ }
+ })
+ .build();
+
+ EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+ .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+ .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
+
+ EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
+ .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
+ .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
+
+ cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
+ cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
+
+ // app.start blocks so in the failure case this test would block forever.
+ Asserts.assertReturnsEventually(new Runnable() {
+ public void run() {
+ app.start(ImmutableList.of(app.newSimulatedLocation()));
+ EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ }
+ }, Asserts.DEFAULT_LONG_TIMEOUT);
+ }
+
+ @Test
+ public void testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission() {
+ // Tests that permits are not released when their start task is cancelled.
+ // Expected behaviour is:
+ // - permit obtained for first member. cancelled task submitted. permit released.
+ // - no permit obtained for second member. cancelled task submitted. no permit released.
+ DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(CancelEffectorInvokeCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+ .configure(DynamicCluster.INITIAL_SIZE, 2)
+ .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1));
+ final DynamicClusterImpl clusterImpl = DynamicClusterImpl.class.cast(Entities.deproxy(cluster));
+ assertNotNull(clusterImpl.getChildTaskSemaphore());
+ assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+ try {
+ app.start(ImmutableList.<Location>of(app.newSimulatedLocation()));
+ Asserts.shouldHaveFailedPreviously("Cluster start should have failed because the member start was cancelled");
+ } catch (Exception e) {
+ // ignored.
+ }
+ assertEquals(clusterImpl.getChildTaskSemaphore().availablePermits(), 1);
+ }
+
+ @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
+ public interface ThrowOnAsyncStartEntity extends TestEntity {
+ ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1);
+ ConfigKey<AtomicInteger> COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class, "counter");
+ ConfigKey<Boolean> START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch");
+ }
+
+ public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity {
+ private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
+ @Override
+ public void start(Collection<? extends Location> locs) {
+ int count = config().get(COUNTER).incrementAndGet();
+ try {
+ LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)});
+ config().get(START_LATCH);
+ // Throw if more than one entity is starting at the same time as this.
+ assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count + " <= " + config().get(MAX_CONCURRENCY));
+ super.start(locs);
+ } finally {
+ config().get(COUNTER).decrementAndGet();
+ }
+ }
+ }
+
+ /** Used in {@link #testChildCommandPermitNotReleasedWhenMemberStartTaskCancelledBeforeSubmission}. */
+ @ImplementedBy(CancelEffectorInvokeClusterImpl.class)
+ public interface CancelEffectorInvokeCluster extends DynamicCluster {}
+
+ /** Overrides {@link DynamicClusterImpl#newThrottledEffectorTask} to cancel each task before it's submitted. */
+ public static class CancelEffectorInvokeClusterImpl extends DynamicClusterImpl implements CancelEffectorInvokeCluster {
+ @Override
+ protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
+ Task<?> unsubmitted = super.newThrottledEffectorTask(target, effector, arguments, isPrivileged);
+ unsubmitted.cancel(true);
+ return unsubmitted;
+ }
+ }
+
}
[3/3] brooklyn-server git commit: This closes #443
Posted by sj...@apache.org.
This closes #443
Adds maxConcurrentChildCommands parameter to DynamicCluster
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d1ef42a1
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d1ef42a1
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d1ef42a1
Branch: refs/heads/master
Commit: d1ef42a1e004cde87fe9bd7f1fda144b0fcf0a5d
Parents: 23f9a71 4e30074
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Authored: Thu Dec 1 12:00:29 2016 +0000
Committer: Sam Corbett <sa...@cloudsoftcorp.com>
Committed: Thu Dec 1 12:00:29 2016 +0000
----------------------------------------------------------------------
.../brooklyn/core/effector/Effectors.java | 8 +-
.../apache/brooklyn/core/entity/Entities.java | 2 +-
.../brooklyn/entity/group/DynamicCluster.java | 11 +-
.../entity/group/DynamicClusterImpl.java | 154 ++++++++++++++++++-
.../brooklyn/util/core/task/ParallelTask.java | 27 +++-
.../brooklyn/util/core/task/SequentialTask.java | 27 +++-
.../entity/group/DynamicClusterRebindTest.java | 54 +++++++
.../entity/group/DynamicClusterTest.java | 132 ++++++++++++++++
8 files changed, 389 insertions(+), 26 deletions(-)
----------------------------------------------------------------------