You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by sjcorbett <gi...@git.apache.org> on 2016/10/18 12:47:48 UTC

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

GitHub user sjcorbett opened a pull request:

    https://github.com/apache/brooklyn-server/pull/386

    Add AbstractInvokeEffectorPolicy

    Supports tracking the number of effector tasks that are ongoing and publishing that as an "is busy" sensor on the entity the policy is attached to. Tracking is enabled by providing a value for `isBusySensor` when configuring the policy. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sjcorbett/brooklyn-server invoke-effector-is-busy

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/brooklyn-server/pull/386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #386
    
----
commit 499c3489d283df3f292fdea55293ccfd95bb0ebb
Author: Sam Corbett <sa...@cloudsoftcorp.com>
Date:   2016-10-17T16:29:07Z

    Add AbstractInvokeEffectorPolicy
    
    Supports tracking the number of effector tasks that are ongoing and
    publishing that as an "is busy" sensor on the entity the policy is
    attached to.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #386: Add AbstractInvokeEffectorPolicy

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/386
  
    @aledsage thanks for your comments. I've replaced `TASK_COUNTER` with a field and have added a test for the sensor to `InvokeEffectorOnCollectionSensorChangeTest`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r85702979
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Objects;
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    /**
    +     * Indicates that onEvent was notified of an value of is not the latest sensor value.
    +     */
    +    private boolean moreUpdatesComing;
    +    /**
    +     * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications
    +     * of earlier events will not cause updates of moreUpdatesComing.
    +     */
    +    private long mostRecentUpdate = 0;
    +    /**
    +     * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}.
    +     */
    +    private final Object[] moreUpdatesLock = new Object[0];
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    +        }
    +    }
    +
    +    /**
    +     * Invoke effector with parameters on the entity that the policy is attached to.
    +     */
    +    protected <T> Task<T> invoke(Effector<T> effector, Map<String, ?> parameters) {
    +        if (isBusySensorEnabled()) {
    +            getTaskCounter().incrementAndGet();
    +            publishIsBusy();
    +        }
    +        Task<T> task = entity.invoke(effector, parameters);
    +        if (isBusySensorEnabled()) {
    +            task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor());
    +        }
    +        return task;
    +    }
    +
    +    protected boolean isBusy() {
    +        synchronized (moreUpdatesLock) {
    --- End diff --
    
    No need to lock here - `taskCounter` & `moreUpdatesComing` are independent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r85076312
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    --- End diff --
    
    ah, thanks @sjcorbett that's great


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/brooklyn-server/pull/386


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on the issue:

    https://github.com/apache/brooklyn-server/pull/386
  
    Good to merge as is but will wait for response on above comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r84946013
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    --- End diff --
    
    It relies on `taskCounter` _not_ being persisted. When Brooklyn rebinds `taskCounter` is a fresh instance whose count is zero so `isBusy` will be `false`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r84884682
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnCollectionSensorChange.java ---
    @@ -128,10 +132,25 @@ public void setEntity(EntityLocal entity) {
         }
     
         @Override
    +    protected boolean isBusy() {
    +        return super.isBusy() || moreUpdatesComing.get();
    +    }
    +
    +    @Override
         public void onEvent(SensorEvent<Collection<?>> event) {
             final Set<Object> newValue = event.getValue() != null
                     ? new LinkedHashSet<>(event.getValue())
                     : ImmutableSet.of();
    +        if (isBusySensorEnabled()) {
    +            // There are more events coming that this policy hasn't been notified of if the
    +            // value received in the event does not match the current value of the sensor.
    +            final Collection<?> sensorVal = entity.sensors().get(getTriggerSensor());
    +            final Set<Object> sensorValSet = sensorVal != null
    +                ? new LinkedHashSet<>(sensorVal)
    +                : ImmutableSet.of();
    +            final boolean moreComing = !Objects.equals(newValue, sensorValSet);
    +            moreUpdatesComing.set(moreComing);
    +        }
    --- End diff --
    
    Why not do this in the parent class? It would be useful in "InvokeEffectorOnSensorChange" as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r85719367
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Objects;
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    /**
    +     * Indicates that onEvent was notified of an value of is not the latest sensor value.
    +     */
    +    private boolean moreUpdatesComing;
    +    /**
    +     * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications
    +     * of earlier events will not cause updates of moreUpdatesComing.
    +     */
    +    private long mostRecentUpdate = 0;
    +    /**
    +     * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}.
    +     */
    +    private final Object[] moreUpdatesLock = new Object[0];
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    +        }
    +    }
    +
    +    /**
    +     * Invoke effector with parameters on the entity that the policy is attached to.
    +     */
    +    protected <T> Task<T> invoke(Effector<T> effector, Map<String, ?> parameters) {
    +        if (isBusySensorEnabled()) {
    +            getTaskCounter().incrementAndGet();
    +            publishIsBusy();
    +        }
    +        Task<T> task = entity.invoke(effector, parameters);
    +        if (isBusySensorEnabled()) {
    +            task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor());
    +        }
    +        return task;
    +    }
    +
    +    protected boolean isBusy() {
    +        synchronized (moreUpdatesLock) {
    --- End diff --
    
    Good point - good as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r85716905
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Objects;
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    /**
    +     * Indicates that onEvent was notified of an value of is not the latest sensor value.
    +     */
    +    private boolean moreUpdatesComing;
    +    /**
    +     * The timestamp of the event that informed moreUpdatesComing. Subsequent notifications
    +     * of earlier events will not cause updates of moreUpdatesComing.
    +     */
    +    private long mostRecentUpdate = 0;
    +    /**
    +     * Guards {@link #moreUpdatesComing} and {@link #mostRecentUpdate}.
    +     */
    +    private final Object[] moreUpdatesLock = new Object[0];
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    +        }
    +    }
    +
    +    /**
    +     * Invoke effector with parameters on the entity that the policy is attached to.
    +     */
    +    protected <T> Task<T> invoke(Effector<T> effector, Map<String, ?> parameters) {
    +        if (isBusySensorEnabled()) {
    +            getTaskCounter().incrementAndGet();
    +            publishIsBusy();
    +        }
    +        Task<T> task = entity.invoke(effector, parameters);
    +        if (isBusySensorEnabled()) {
    +            task.addListener(new EffectorListener(), MoreExecutors.sameThreadExecutor());
    +        }
    +        return task;
    +    }
    +
    +    protected boolean isBusy() {
    +        synchronized (moreUpdatesLock) {
    --- End diff --
    
    I included the synchronisation to make sure that `moreUpdatesComing` is visible to whatever thread invokes the effector's completion listener (which calls `publishIsBusy` which calls `isBusy`). At a minimum the field should be volatile then. Is it a big one for you? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r85703704
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/InvokeEffectorOnSensorChange.java ---
    @@ -40,28 +44,56 @@
      * * support conditions
      * * allow to be triggered by sensors on members
      */
    -public class InvokeEffectorOnSensorChange extends AbstractPolicy implements SensorEventListener<Object> {
    +public class InvokeEffectorOnSensorChange extends AbstractInvokeEffectorPolicy implements SensorEventListener<Object> {
         
         private static final Logger LOG = LoggerFactory.getLogger(InvokeEffectorOnSensorChange.class);
     
    -    public static final ConfigKey<Object> SENSOR = ConfigKeys.newConfigKey(Object.class, 
    -            "sensor", "Sensor to be monitored, as string or sensor type");
    +    public static final ConfigKey<Object> SENSOR = ConfigKeys.builder(Object.class)
    +            .name("sensor")
    +            .description("Sensor to be monitored, as string or sensor type")
    +            .constraint(Predicates.notNull())
    +            .build();
     
    -    public static final ConfigKey<String> EFFECTOR = ConfigKeys.newStringConfigKey(
    -            "effector", "Name of effector to invoke");
    +    public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
    +            .name("effector")
    +            .description("Name of effector to invoke")
    +            .constraint(StringPredicates.isNonBlank())
    +            .build();
    +
    +    private AttributeSensor<Object> sensor;
     
         @Override
         public void setEntity(EntityLocal entity) {
             super.setEntity(entity);
             Preconditions.checkNotNull(getConfig(EFFECTOR), EFFECTOR);
    -        Object sensor = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
    -        if (sensor instanceof String) sensor = Sensors.newSensor(Object.class, (String)sensor);
    -        subscriptions().subscribe(entity, (Sensor<?>)sensor, this);
    +        sensor = getSensor();
    +        subscriptions().subscribe(entity, sensor, this);
    +        LOG.debug("{} subscribed to {} events on {}", new Object[]{this, sensor, entity});
         }
     
         @Override
         public void onEvent(SensorEvent<Object> event) {
    -        entity.invoke(entity.getEntityType().getEffectorByName(getConfig(EFFECTOR)).get(), MutableMap.<String, Object>of());
    +        final Effector<?> eff = getEffectorNamed(getConfig(EFFECTOR)).get();
    +        if (isBusySensorEnabled()) {
    +            final Object currentSensorValue = entity.sensors().get(sensor);
    +            setMoreUpdatesComing(event.getTimestamp(), event.getValue(), currentSensorValue);
    +        }
    +        invoke(eff, MutableMap.<String, Object>of());
    +    }
    +
    +    private AttributeSensor<Object> getSensor() {
    +        final Object configVal = Preconditions.checkNotNull(getConfig(SENSOR), SENSOR);
    +        final AttributeSensor<Object> sensor;
    +        if (configVal == null) {
    +            throw new NullPointerException("Value for " + SENSOR.getName() + " is null");
    +        } else if (configVal instanceof String) {
    +            sensor = Sensors.newSensor(Object.class, (String) configVal);
    +        } else if (configVal instanceof AttributeSensor) {
    +            sensor = (AttributeSensor<Object>) configVal;
    +        } else {
    +            sensor = TypeCoercions.tryCoerce(configVal, new TypeToken<AttributeSensor<Object>>() {}).get();
    +        }
    --- End diff --
    
    No need for `if`s, could replace it all with `coerce`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r84328465
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,120 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<AtomicInteger> TASK_COUNTER = ConfigKeys.newConfigKey(AtomicInteger.class,
    +            "taskCounter", "Counts the number of incomplete effector invocations made by the policy");
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Overwrites persisted values and republishes when the entity rebinds.
    +            config().set(TASK_COUNTER, new AtomicInteger());
    --- End diff --
    
    Should we be storing this as config? On rebind, our effector will not resume so the counter should be reset. However, because we've persisted it as a config key, then it will keep its old value.
    
    Maybe it's better for now to just have this as a field (without `@SetFromFlag` obviously).
    
    What should we do on rebind when we were executing the effector? Currently it will keep the count of 1 and publish busy "true". If we changed to a field, then it would publish "false".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #386: Add AbstractInvokeEffectorPolicy

Posted by aledsage <gi...@git.apache.org>.
Github user aledsage commented on the issue:

    https://github.com/apache/brooklyn-server/pull/386
  
    Looks good. As well as the task-counter config comment, my only other comment is... should we add to the tests for `InvokeEffectorOnSensorChange` and `InvokeEffectorOnCollectionSensorChange`, that the busy-sensor is set?
    
    Except that highlights the absence of tests for `InvokeEffectorOnSensorChange`.
    
    I think this PR is fine without adding to the tests for `InvokeEffectorOnCollectionSensorChangeTest`, and should not be blocked adding tests for `InvokeEffectorOnSensorChange` (but we should add tests for that separately!).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server pull request #386: Add AbstractInvokeEffectorPolicy

Posted by geomacy <gi...@git.apache.org>.
Github user geomacy commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/386#discussion_r84929449
  
    --- Diff: core/src/main/java/org/apache/brooklyn/policy/AbstractInvokeEffectorPolicy.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.brooklyn.policy;
    +
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import javax.annotation.Nonnull;
    +
    +import org.apache.brooklyn.api.effector.Effector;
    +import org.apache.brooklyn.api.entity.EntityLocal;
    +import org.apache.brooklyn.api.mgmt.Task;
    +import org.apache.brooklyn.api.sensor.AttributeSensor;
    +import org.apache.brooklyn.config.ConfigKey;
    +import org.apache.brooklyn.core.config.ConfigKeys;
    +import org.apache.brooklyn.core.policy.AbstractPolicy;
    +import org.apache.brooklyn.core.sensor.Sensors;
    +import org.apache.brooklyn.util.guava.Maybe;
    +import org.apache.brooklyn.util.text.Strings;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.util.concurrent.MoreExecutors;
    +
    +public abstract class AbstractInvokeEffectorPolicy extends AbstractPolicy {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(AbstractInvokeEffectorPolicy.class);
    +
    +    public static final ConfigKey<String> IS_BUSY_SENSOR_NAME = ConfigKeys.newStringConfigKey(
    +            "isBusySensor",
    +            "Name of the sensor to publish on the entity that indicates that this policy has incomplete effectors. " +
    +                    "If unset running tasks will not be tracked.");
    +
    +    private final AtomicInteger taskCounter = new AtomicInteger();
    +
    +    @Override
    +    public void setEntity(EntityLocal entity) {
    +        super.setEntity(entity);
    +        if (isBusySensorEnabled()) {
    +            // Republishes when the entity rebinds.
    +            publishIsBusy();
    --- End diff --
    
    I take it this relies on the atomic integer being persisted upon shutdown. Say you have just invoked one effector - what would happen if the server were shut down before the effector task had been invoked?  In this case does the task get cancelled? Upon rebind, would we get a non zero value on the counter, but there would no longer be a task that would ever decrement it to zero?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #386: Add AbstractInvokeEffectorPolicy

Posted by sjcorbett <gi...@git.apache.org>.
Github user sjcorbett commented on the issue:

    https://github.com/apache/brooklyn-server/pull/386
  
    @neykov I've pushed some updates that extend the 'more updates coming' notion to `InvokeEffectorOnSensorChange`. I'd appreciate you casting your eye over `AbstractInvokeEffectorPolicy` once more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] brooklyn-server issue #386: Add AbstractInvokeEffectorPolicy

Posted by neykov <gi...@git.apache.org>.
Github user neykov commented on the issue:

    https://github.com/apache/brooklyn-server/pull/386
  
    Thanks @sjcorbett, merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---