You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by sjcorbett <gi...@git.apache.org> on 2016/11/18 15:44:08 UTC

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

GitHub user sjcorbett opened a pull request:

    https://github.com/apache/brooklyn-server/pull/443

    Adds maxConcurrentChildCommands paramter to DynamicCluster

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sjcorbett/brooklyn-server throttle-task

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/brooklyn-server/pull/443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #443
    
----
commit 5b9f89653ee5979665bff445d38750e1d0f234ae
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Date:   2016-11-16T15:36:45Z

    Readability

commit e5a0fe5ee6b71ca6e250b8c64bac48adf4813372
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Date:   2016-11-18T15:31:07Z

    Adds maxConcurrentChildCommands paramter to DynamicCluster
    
    The option configures the maximum number of simultaneous Startable
    effector invocations that will be made on members of the group.

commit faeffbe18dfec7fde0d18c818592c46b0a333d7e
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Date:   2016-11-18T15:31:31Z

    BROOKLYN-396: Add failing test to demonstrate problem

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    @geomacy I agree in principle but think it's outwith this pull request. Completing a resize before beginning another is currently part of `DynamicCluster's` contract. Your comment about stopping two-at-a-time is interesting and I'll look into it more. My preference is to merge this pull request now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88745096
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java ---
    @@ -1225,4 +1235,93 @@ private void assertFirstAndNonFirstCounts(Collection<Entity> members, int expect
             assertEquals(found.size(), expectedNonFirstCount);
         }
     
    +    @DataProvider
    +    public Object[][] maxConcurrentCommandsTestProvider() {
    --- End diff --
    
    nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89777765
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java ---
    @@ -1225,4 +1235,93 @@ private void assertFirstAndNonFirstCounts(Collection<Entity> members, int expect
             assertEquals(found.size(), expectedNonFirstCount);
         }
     
    +    @DataProvider
    +    public Object[][] maxConcurrentCommandsTestProvider() {
    +        return new Object[][]{{1}, {2}, {3}};
    +    }
    +
    +    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
    +    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
    --- End diff --
    
    I wonder what the changes are of this failing without your semaphore. The problem is that the work done inside the `start()` method (between the increment + decrement of the counter) is pretty small. We're probably ok with 10 entities and such a small `maxConcurrentCommands` but still...
    
    I'm loathe to suggest it, but maybe we want to configure the entity with a very short sleep in the middle of `start()`. Even just a few milliseconds would force the thread to yield, and thus make the concurrent commands much more likely to try to execute.
    
    Up to you whether you think that's worth it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88696685
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---
    @@ -825,8 +858,12 @@ protected Entity replaceMember(Entity member, @Nullable Location memberLoc, Map<
                 addedEntities.add(entity);
                 addedEntityLocations.put(entity, loc);
                 if (entity instanceof Startable) {
    +                boolean privileged = false;
    +                if (Boolean.TRUE.equals(entity.sensors().get(AbstractGroup.FIRST_MEMBER))) {
    --- End diff --
    
    Remind me why not just `privileged = entity.sensors().get(AbstractGroup.FIRST_MEMBER)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88693348
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---
    @@ -551,8 +577,9 @@ public void restart() {
                     Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
             } else if ("parallel".equalsIgnoreCase(mode)) {
                 ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
    -            DynamicTasks.queue(Effectors.invocationParallel(Startable.RESTART, null, 
    -                Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))));
    +            for (Entity member : Iterables.filter(getChildren(), Predicates.and(Predicates.instanceOf(Startable.class), EntityPredicates.isManaged()))) {
    --- End diff --
    
    worth a few static imports for readability?
    ```
                for (Entity member : filter(getChildren(), and(instanceOf(Startable.class), isManaged()))) {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88712700
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.entity.group;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.brooklyn.api.entity.EntitySpec;
    +import org.apache.brooklyn.core.entity.Attributes;
    +import org.apache.brooklyn.core.entity.Entities;
    +import org.apache.brooklyn.core.entity.EntityAsserts;
    +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
    +import org.testng.annotations.Test;
    +
    +import com.google.common.collect.ImmutableList;
    +
    +public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
    +
    +    @Test
    +    public void testThrottleAppliesAfterRebind() throws Exception {
    +        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    --- End diff --
    
    Probably. I'll add it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    Have caught up and can confirm the tests work for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89775723
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java ---
    @@ -1225,4 +1235,93 @@ private void assertFirstAndNonFirstCounts(Collection<Entity> members, int expect
             assertEquals(found.size(), expectedNonFirstCount);
         }
     
    +    @DataProvider
    +    public Object[][] maxConcurrentCommandsTestProvider() {
    +        return new Object[][]{{1}, {2}, {3}};
    +    }
    +
    +    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
    +    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
    +        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
    +        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    +                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
    +                .configure(DynamicCluster.INITIAL_SIZE, 10)
    +                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
    +        app.start(ImmutableList.of(app.newSimulatedLocation()));
    +        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
    +    }
    +
    +    @Test
    +    public void testFirstMemberStartsFirstWhenMaxConcurrentCommandsIsGiven() {
    +        final AtomicInteger counter = new AtomicInteger();
    +
    +        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    +                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
    +                .configure(DynamicCluster.INITIAL_SIZE, 10));
    +
    +        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
    +                .body(new Callable<Boolean>() {
    +                    @Override
    +                    public Boolean call() throws Exception {
    +                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
    +                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
    +                        final Entity source = first.get();
    +                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
    +                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
    +                        return booleanTask.get();
    +                    }
    +                })
    +                .build();
    +
    +        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
    +                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
    +
    +        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
    +                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
    +
    +        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
    +        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
    +
    +        // app.start blocks so in the failure case this test would block forever.
    +        Thread t = new Thread() {
    +            @Override public void run() {
    +                app.start(ImmutableList.of(app.newSimulatedLocation()));
    +            }
    +        };
    +        t.start();
    +        try {
    +            EntityAsserts.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
    +        } finally {
    +            if (t.isAlive()) {
    +                t.interrupt();
    +            }
    +            Entities.unmanage(cluster);
    +        }
    +    }
    +
    +    @ImplementedBy(ThrowOnAsyncStartEntityImpl.class)
    +    public interface ThrowOnAsyncStartEntity extends TestEntity {
    +        ConfigKey<Integer> MAX_CONCURRENCY = ConfigKeys.newConfigKey(Integer.class, "concurrency", "max concurrency", 1);
    +        ConfigKey<AtomicInteger> COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class, "counter");
    +        ConfigKey<Boolean> START_LATCH = ConfigKeys.newConfigKey(Boolean.class, "startlatch");
    +    }
    +
    +    public static class ThrowOnAsyncStartEntityImpl extends TestEntityImpl implements ThrowOnAsyncStartEntity {
    +        private static final Logger LOG = LoggerFactory.getLogger(ThrowOnAsyncStartEntityImpl.class);
    +        @Override
    +        public void start(Collection<? extends Location> locs) {
    +            int count = config().get(COUNTER).incrementAndGet();
    +            LOG.debug("{} starting (first={})", new Object[]{this, sensors().get(AbstractGroup.FIRST_MEMBER)});
    +            config().get(START_LATCH);
    +            // Throw if more than one entity is starting at the same time as this.
    +            assertTrue(count <= config().get(MAX_CONCURRENCY), "expected " + count + " <= " + config().get(MAX_CONCURRENCY));
    +            super.start(locs);
    +            config().get(COUNTER).decrementAndGet();
    --- End diff --
    
    Wrap in a try-finally block, to ensure the counter is deprecated? Probably not necessary for the existing tests, as they're not failing the start or interrupting. But still worth adding I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88691729
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java ---
    @@ -183,6 +183,14 @@
         ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
                 "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
     
    +    @SetFromFlag("maxConcurrentChildCommands")
    +    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
    +            .name("cluster.maxConcurrentChildCommands")
    +            .description("The maximum number of effector invocations that will be made on children at once " +
    --- End diff --
    
    maybe "... invocations on children that will run concurrently at any time"?  & ... "less than or equal to zero means that no limit is placed on concurrent execution"??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    I'm going to merge this now. Happy to address any further comments separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88699540
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---
    @@ -1040,14 +1077,80 @@ protected void discardNode(Entity entity) {
     
         protected void stopAndRemoveNode(Entity member) {
             removeMember(member);
    -
             try {
                 if (member instanceof Startable) {
    -                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
    +                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
                     task.getUnchecked();
                 }
             } finally {
                 Entities.unmanage(member);
             }
         }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
    +        return newThrottledEffectorTask(target, effector, arguments, false);
    +    }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
    +        final Semaphore permit = childTaskSemaphore;
    +        final Task<?> toSubmit;
    +        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
    +        if (permit != null) {
    +            // Acquire the permit now for the privileged task and just queue the effector invocation.
    +            // If it's unprivileged then queue a task to obtain a permit first.
    +            final String description = "Waiting for permit to run " + effector.getName() + " on " + target;
    +            final Runnable obtain = new ObtainPermit(permit, description);
    +            if (isPrivileged) {
    +                obtain.run();
    +                toSubmit = effectorTask;
    +            } else {
    +                // acquire semaphore here if privileged?
    +                Task<?> obtainMutex = Tasks.builder()
    --- End diff --
    
    Is it worth moving this and `ObtainPermit` into `Tasks.java`, so that this call would become something like 
    ```
                    Task<?> obtainMutex = Tasks.builder()
                            .description(description)
                            .throttleOn(permit)
                            .build();
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    @aledsage regarding the obtain and release of permits, I've pushed an update and test case that checks how permits are handled when tasks are cancelled. See https://github.com/apache/brooklyn-server/pull/443/files#diff-a0b8d0880424918e737424abac6a9694R1123 for the implementation and https://github.com/apache/brooklyn-server/pull/443/files#diff-f27a4972ae513473372a4703cb4526caR1301 for the test. Could you re-review this? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88697566
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---
    @@ -1040,14 +1077,80 @@ protected void discardNode(Entity entity) {
     
         protected void stopAndRemoveNode(Entity member) {
             removeMember(member);
    -
             try {
                 if (member instanceof Startable) {
    -                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
    +                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
                     task.getUnchecked();
                 }
             } finally {
                 Entities.unmanage(member);
             }
         }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
    +        return newThrottledEffectorTask(target, effector, arguments, false);
    +    }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
    +        final Semaphore permit = childTaskSemaphore;
    --- End diff --
    
    why the local variable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88690173
  
    --- Diff: core/src/main/java/org/apache/brooklyn/core/effector/Effectors.java ---
    @@ -183,14 +183,18 @@ private EffectorBuilder(Class<T> returnType, String effectorName) {
         public static TaskAdaptable<List<?>> invocationParallel(Effector<?> eff, Map<?,?> params, Iterable<? extends Entity> entities) {
             List<TaskAdaptable<?>> tasks = new ArrayList<TaskAdaptable<?>>();
             for (Entity e: entities) tasks.add(invocation(e, eff, params));
    -        return Tasks.parallel("invoking "+eff+" on "+tasks.size()+" node"+(Strings.s(tasks.size())), tasks.toArray(new TaskAdaptable[tasks.size()]));
    +        return Tasks.parallel(
    --- End diff --
    
    Hooray! readable code!   \U0001f44d \U0001f44d 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89785111
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java ---
    @@ -1225,4 +1235,93 @@ private void assertFirstAndNonFirstCounts(Collection<Entity> members, int expect
             assertEquals(found.size(), expectedNonFirstCount);
         }
     
    +    @DataProvider
    +    public Object[][] maxConcurrentCommandsTestProvider() {
    +        return new Object[][]{{1}, {2}, {3}};
    +    }
    +
    +    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
    +    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
    --- End diff --
    
    It always fails for me when I remove `.configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)` from a few lines below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88699755
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.entity.group;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.brooklyn.api.entity.EntitySpec;
    +import org.apache.brooklyn.core.entity.Attributes;
    +import org.apache.brooklyn.core.entity.Entities;
    +import org.apache.brooklyn.core.entity.EntityAsserts;
    +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
    +import org.testng.annotations.Test;
    +
    +import com.google.common.collect.ImmutableList;
    +
    +public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
    +
    +    @Test
    +    public void testThrottleAppliesAfterRebind() throws Exception {
    +        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    --- End diff --
    
    Worth having a repeat of the test with MAX > 1?  Either here and/or in DynamicClusterTest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89813930
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java ---
    @@ -1040,14 +1077,81 @@ protected void discardNode(Entity entity) {
     
         protected void stopAndRemoveNode(Entity member) {
             removeMember(member);
    -
             try {
                 if (member instanceof Startable) {
    -                Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
    +                Task<?> task = newThrottledEffectorTask(member, Startable.STOP, Collections.<String, Object>emptyMap());
    +                DynamicTasks.queueIfPossible(task).orSubmitAsync();
                     task.getUnchecked();
                 }
             } finally {
                 Entities.unmanage(member);
             }
         }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments) {
    +        return newThrottledEffectorTask(target, effector, arguments, false);
    +    }
    +
    +    protected <T> Task<?> newThrottledEffectorTask(Entity target, Effector<T> effector, Map<?, ?> arguments, boolean isPrivileged) {
    +        final Semaphore permit = childTaskSemaphore;
    +        final Task<?> toSubmit;
    +        final Task<T> effectorTask = Effectors.invocation(target, effector, arguments).asTask();
    +        if (permit != null) {
    +            // Acquire the permit now for the privileged task and just queue the effector invocation.
    +            // If it's unprivileged then queue a task to obtain a permit first.
    +            final String description = "Waiting for permit to run " + effector.getName() + " on " + target;
    +            final Runnable obtain = new ObtainPermit(permit, description);
    +            if (isPrivileged) {
    +                obtain.run();
    +                toSubmit = effectorTask;
    +            } else {
    +                // acquire semaphore here if privileged?
    +                Task<?> obtainMutex = Tasks.builder()
    +                        .description(description)
    +                        .body(new ObtainPermit(permit, description))
    +                        .build();
    +                toSubmit = Tasks.sequential(obtainMutex, effectorTask);
    +            }
    +            toSubmit.addListener(new ReleasePermit(permit), MoreExecutors.sameThreadExecutor());
    --- End diff --
    
    Do you think it's better to queue the release as another task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    Build failure external:
    ```
    [WARNING] Failed to notify spy hudson.maven.Maven3Builder$JenkinsEventSpy: Invalid object ID 9 iota=46
    [WARNING] Failed to notify spy hudson.maven.Maven3Builder$JenkinsEventSpy: Invalid object ID 9 iota=46
    [WARNING] Failed to notify spy hudson.maven.Maven3Builder$JenkinsEventSpy: Invalid object ID 25 iota=46
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88712272
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java ---
    @@ -183,6 +183,14 @@
         ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
                 "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
     
    +    @SetFromFlag("maxConcurrentChildCommands")
    +    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
    +            .name("cluster.maxConcurrentChildCommands")
    --- End diff --
    
    Agree. However, it's consistent with the other config and sensors in the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88720447
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterRebindTest.java ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.entity.group;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.apache.brooklyn.api.entity.EntitySpec;
    +import org.apache.brooklyn.core.entity.Attributes;
    +import org.apache.brooklyn.core.entity.Entities;
    +import org.apache.brooklyn.core.entity.EntityAsserts;
    +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
    +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
    +import org.testng.annotations.Test;
    +
    +import com.google.common.collect.ImmutableList;
    +
    +public class DynamicClusterRebindTest extends RebindTestFixtureWithApp {
    +
    +    @Test
    +    public void testThrottleAppliesAfterRebind() throws Exception {
    +        DynamicCluster cluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    --- End diff --
    
    I added this to `DynamicClusterTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/brooklyn-server/pull/443


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    Have done some more testing with this, with the blueprint:
    ```
    name: test
    location: amazon-eu-central-1
    services:
    - type: org.apache.brooklyn.entity.group.DynamicCluster:0.10.0-SNAPSHOT
      brooklyn.config:
        dynamiccluster.firstmemberspec:
          $brooklyn:entitySpec:
            type: org.apache.brooklyn.entity.software.base.EmptySoftwareProcess
            name: first
        dynamiccluster.memberspec:
          $brooklyn:entitySpec:
            type: org.apache.brooklyn.entity.software.base.EmptySoftwareProcess
            name: VM
        cluster.initial.size: 20
        cluster.maxConcurrentChildCommands: 3
    ```
    and one issue I noticed was that, once you've deployed the cluster, if you have second thoughts and decide to stop it when only a few nodes have started, say three, the stop effector call "goes to the back of the queue", and the remaining seventeen nodes gradually start up, three at a time, before the stop finally gets its chance to do anything.   Then the nodes gradually shut down (oddly, two at a time?).
    
    I think this would be sufficiently annoying for users that we should think about allowing priority to stop commands?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands paramter ...

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r88691001
  
    --- Diff: core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java ---
    @@ -183,6 +183,14 @@
         ConfigKey<Integer> CLUSTER_MEMBER_ID = ConfigKeys.newIntegerConfigKey(
                 "cluster.member.id", "The unique ID number (sequential) of a member of a cluster");
     
    +    @SetFromFlag("maxConcurrentChildCommands")
    +    ConfigKey<Integer> MAX_CONCURRENT_CHILD_COMMANDS = ConfigKeys.builder(Integer.class)
    +            .name("cluster.maxConcurrentChildCommands")
    --- End diff --
    
    "cluster" is a bit redundant here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #443: Adds maxConcurrentChildCommands parameter...

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/443#discussion_r89776930
  
    --- Diff: core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java ---
    @@ -1225,4 +1235,93 @@ private void assertFirstAndNonFirstCounts(Collection<Entity> members, int expect
             assertEquals(found.size(), expectedNonFirstCount);
         }
     
    +    @DataProvider
    +    public Object[][] maxConcurrentCommandsTestProvider() {
    +        return new Object[][]{{1}, {2}, {3}};
    +    }
    +
    +    @Test(dataProvider = "maxConcurrentCommandsTestProvider")
    +    public void testEntitiesStartAndStopSequentiallyWhenMaxConcurrentCommandsIsOne(int maxConcurrentCommands) {
    +        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.MAX_CONCURRENCY, maxConcurrentCommands)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, new AtomicInteger());
    +        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    +                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, maxConcurrentCommands)
    +                .configure(DynamicCluster.INITIAL_SIZE, 10)
    +                .configure(DynamicCluster.MEMBER_SPEC, memberSpec));
    +        app.start(ImmutableList.of(app.newSimulatedLocation()));
    +        assertEquals(cluster.sensors().get(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
    +    }
    +
    +    @Test
    +    public void testFirstMemberStartsFirstWhenMaxConcurrentCommandsIsGiven() {
    +        final AtomicInteger counter = new AtomicInteger();
    +
    +        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
    +                .configure(DynamicCluster.MAX_CONCURRENT_CHILD_COMMANDS, 1)
    +                .configure(DynamicCluster.INITIAL_SIZE, 10));
    +
    +        Task<Boolean> firstMemberUp = Tasks.<Boolean>builder()
    +                .body(new Callable<Boolean>() {
    +                    @Override
    +                    public Boolean call() throws Exception {
    +                        Task<Entity> first = DependentConfiguration.attributeWhenReady(cluster, DynamicCluster.FIRST);
    +                        DynamicTasks.queueIfPossible(first).orSubmitAsync();
    +                        final Entity source = first.get();
    +                        final Task<Boolean> booleanTask = DependentConfiguration.attributeWhenReady(source, Attributes.SERVICE_UP);
    +                        DynamicTasks.queueIfPossible(booleanTask).orSubmitAsync();
    +                        return booleanTask.get();
    +                    }
    +                })
    +                .build();
    +
    +        EntitySpec<ThrowOnAsyncStartEntity> firstMemberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
    +                .configure(ThrowOnAsyncStartEntity.START_LATCH, true);
    +
    +        EntitySpec<ThrowOnAsyncStartEntity> memberSpec = EntitySpec.create(ThrowOnAsyncStartEntity.class)
    +                .configure(ThrowOnAsyncStartEntity.COUNTER, counter)
    +                .configure(ThrowOnAsyncStartEntity.START_LATCH, firstMemberUp);
    +
    +        cluster.config().set(DynamicCluster.FIRST_MEMBER_SPEC, firstMemberSpec);
    +        cluster.config().set(DynamicCluster.MEMBER_SPEC, memberSpec);
    +
    +        // app.start blocks so in the failure case this test would block forever.
    +        Thread t = new Thread() {
    --- End diff --
    
    Very minor - you could use `Asserts.assertReturnsEventually` to do the thread interrupt for you. For example, something like:
    ```
    Asserts.assertReturnsEventually(new Runnable() {
        public void run() {
            app.start(ImmutableList.of(app.newSimulatedLocation()));
        }});
    ```
    
    Also, I don't think you need to do `Entities.unmanage(cluster)` - will that not happen automatically in `tearDown` anyway?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    Jenkins did its job. I had forgotten to `submit` a task so `stopAndRemoveNode` blocked forever.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    @sjcorbett (cc @geomacy) LGTM. A few minor comments. Sam, if you can take a look through them and see what you think is worth changing. Then either merge it yourself or ping me if you want me to take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #443: Adds maxConcurrentChildCommands parameter to Dyn...

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/443
  
    Actually I wonder if that's true. The build timed out testing DynamicCluster. I'll double-check locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---