You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/08/30 01:01:03 UTC
[03/26] git commit: add a new "updatingMap" enricher which helps when
multiple enrichers contribute to a map,
and use it for the new chaining of service_up behaviour
add a new "updatingMap" enricher which helps when multiple enrichers contribute to a map, and use it for the new chaining of service_up behaviour
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/a8bff36e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/a8bff36e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/a8bff36e
Branch: refs/heads/master
Commit: a8bff36ec5cbaaa4c4f10d0adfd464ba8d75b8a7
Parents: 45e3035
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Aug 6 17:05:41 2014 -0400
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Mon Aug 25 09:32:26 2014 +0100
----------------------------------------------------------------------
.../main/java/brooklyn/enricher/Enrichers.java | 72 +++++++++
.../brooklyn/enricher/basic/UpdatingMap.java | 149 +++++++++++++++++++
.../java/brooklyn/enricher/EnrichersTest.java | 50 ++++++-
.../entity/basic/SoftwareProcessImpl.java | 26 +---
4 files changed, 274 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/main/java/brooklyn/enricher/Enrichers.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/Enrichers.java b/core/src/main/java/brooklyn/enricher/Enrichers.java
index dab423c..6ddc780 100644
--- a/core/src/main/java/brooklyn/enricher/Enrichers.java
+++ b/core/src/main/java/brooklyn/enricher/Enrichers.java
@@ -30,6 +30,7 @@ import brooklyn.enricher.basic.Aggregator;
import brooklyn.enricher.basic.Combiner;
import brooklyn.enricher.basic.Propagator;
import brooklyn.enricher.basic.Transformer;
+import brooklyn.enricher.basic.UpdatingMap;
import brooklyn.entity.Entity;
import brooklyn.event.AttributeSensor;
import brooklyn.event.Sensor;
@@ -97,6 +98,12 @@ public class Enrichers {
public <S> AggregatorBuilder<S, Object, ?> aggregating(AttributeSensor<S> val) {
return new ConcreteAggregatorBuilder<S,Object>(val);
}
+ /** creates an {@link UpdatingMap} enricher:
+ * {@link UpdatingMapBuilder#from(AttributeSensor)} and {@link UpdatingMapBuilder#computing(Function)} are required
+ **/
+ public <S,TKey,TVal> UpdatingMapBuilder<S, TKey, TVal> updatingMap(AttributeSensor<Map<TKey,TVal>> target) {
+ return new UpdatingMapBuilder<S, TKey, TVal>(target);
+ }
}
@@ -117,6 +124,8 @@ public class Enrichers {
public AggregatorBuilder(AttributeSensor<S> aggregating) {
this.aggregating = aggregating;
}
+ // TODO change the signature of this to have correct type info as done for UpdatingMapBuilder.from(Sensor)
+ // (including change *Builder to Abstract*Builder and Concrete*Builder to *Builder, for all other enricher types)
@SuppressWarnings({ "unchecked", "rawtypes" })
public B publishing(AttributeSensor<? extends T> val) {
this.publishing = (AttributeSensor) checkNotNull(val);
@@ -443,6 +452,61 @@ public class Enrichers {
}
}
+ public abstract static class AbstractUpdatingMapBuilder<S, TKey, TVal, B extends AbstractUpdatingMapBuilder<S, TKey, TVal, B>> extends Builder<B> {
+ protected AttributeSensor<Map<TKey,TVal>> targetSensor;
+ protected AttributeSensor<? extends S> fromSensor;
+ protected TKey key;
+ protected Function<S, ? extends TVal> computing;
+ protected Boolean removingIfResultIsNull;
+
+ public AbstractUpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> target) {
+ this.targetSensor = target;
+ }
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <S2 extends S> UpdatingMapBuilder<S2,TKey,TVal> from(AttributeSensor<S2> fromSensor) {
+ this.fromSensor = checkNotNull(fromSensor);
+ return (UpdatingMapBuilder) this;
+ }
+ public B computing(Function<S,? extends TVal> val) {
+ this.computing = checkNotNull(val);
+ return self();
+ }
+ /** sets an explicit key to use; defaults to using the name of the source sensor specified in {@link #from(AttributeSensor)} */
+ public B key(TKey key) {
+ this.key = key;
+ return self();
+ }
+ /** sets explicit behaviour for treating <code>null</code> return values;
+ * default is to remove */
+ public B removingIfResultIsNull(boolean val) {
+ this.removingIfResultIsNull = val;
+ return self();
+ }
+ public EnricherSpec<?> build() {
+ return EnricherSpec.create(UpdatingMap.class)
+ .uniqueTag("updating:"+targetSensor+"<-"+fromSensor)
+ .configure(MutableMap.builder()
+ .put(UpdatingMap.TARGET_SENSOR, targetSensor)
+ .put(UpdatingMap.SOURCE_SENSOR, fromSensor)
+ .putIfNotNull(UpdatingMap.KEY_IN_TARGET_SENSOR, key)
+ .put(UpdatingMap.COMPUTING, computing)
+ .putIfNotNull(UpdatingMap.REMOVING_IF_RESULT_IS_NULL, removingIfResultIsNull)
+ .build());
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .omitNullValues()
+ .add("publishing", targetSensor)
+ .add("fromSensor", fromSensor)
+ .add("key", key)
+ .add("computing", computing)
+ .add("removingIfResultIsNull", removingIfResultIsNull)
+ .toString();
+ }
+ }
+
private static class ConcreteInitialBuilder extends InitialBuilder<ConcreteInitialBuilder> {
}
@@ -482,6 +546,12 @@ public class Enrichers {
}
}
+ public static class UpdatingMapBuilder<S, TKey, TVal> extends AbstractUpdatingMapBuilder<S, TKey, TVal, UpdatingMapBuilder<S, TKey, TVal>> {
+ public UpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> val) {
+ super(val);
+ }
+ }
+
protected static <T extends Number> T average(Collection<T> vals, Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> type) {
Double doubleValueToReportIfNoSensors = (valueToReportIfNoSensors == null) ? null : valueToReportIfNoSensors.doubleValue();
int count = count(vals, defaultValueForUnreportedSensors!=null);
@@ -529,4 +599,6 @@ public class Enrichers {
}
return result;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java b/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java
new file mode 100644
index 0000000..f85852c
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Enricher which updates an entry in a sensor map ({@link #TARGET_SENSOR})
+ * based on the value of another sensor ({@link #SOURCE_SENSOR}.
+ * <p>
+ * The key used defaults to the name of the source sensor but can be specified with {@link #KEY_IN_TARGET_SENSOR}.
+ * The value placed in the map is the result of applying the function in {@link #COMPUTING} to the sensor value,
+ * with default behaviour being to remove an entry if <code>null</code> is returned
+ * but this can be overriden by setting {@link #REMOVING_IF_RESULT_IS_NULL} false.
+ * {@link Entities#REMOVE} and {@link Entities#UNCHANGED} are also respeced as return values for the computation
+ * (ignoring generics).
+ *
+ * @author alex
+ *
+ * @param <S> source sensor type
+ * @param <TKey> key type in target sensor map
+ * @param <TVal> value type in target sensor map
+ */
+@SuppressWarnings("serial")
+public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements SensorEventListener<S> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UpdatingMap.class);
+
+ @SetFromFlag("fromSensor")
+ public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
+ @SetFromFlag("targetSensor")
+ public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+ @SetFromFlag("key")
+ public static final ConfigKey<?> KEY_IN_TARGET_SENSOR = ConfigKeys.newConfigKey(Object.class, "enricher.updatingMap.keyInTargetSensor",
+ "Key to update in the target sensor map, defaulting to the name of the source sensor");
+ @SetFromFlag("computing")
+ public static final ConfigKey<Function<?, ?>> COMPUTING = ConfigKeys.newConfigKey(new TypeToken<Function<?,?>>() {}, "enricher.updatingMap.computing");
+ @SetFromFlag("removingIfResultIsNull")
+ public static final ConfigKey<Boolean> REMOVING_IF_RESULT_IS_NULL = ConfigKeys.newBooleanConfigKey("enricher.updatingMap.removingIfResultIsNull",
+ "Whether the key in the target map is removed if the result if the computation is null");
+
+ protected AttributeSensor<S> sourceSensor;
+ protected AttributeSensor<Map<TKey,TVal>> targetSensor;
+ protected TKey key;
+ protected Function<S,? extends TVal> computing;
+ protected Boolean removingIfResultIsNull;
+
+ public UpdatingMap() {
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ this.sourceSensor = (AttributeSensor<S>) getRequiredConfig(SOURCE_SENSOR);
+ this.targetSensor = (AttributeSensor<Map<TKey,TVal>>) getRequiredConfig(TARGET_SENSOR);
+ this.key = (TKey) getConfig(KEY_IN_TARGET_SENSOR);
+ this.computing = (Function) getRequiredConfig(COMPUTING);
+ this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL);
+
+ subscribe(entity, sourceSensor, this);
+ onUpdated();
+ }
+
+ @Override
+ public void onEvent(SensorEvent<S> event) {
+ onUpdated();
+ }
+
+ /**
+ * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed).
+ */
+ @SuppressWarnings("unchecked")
+ protected void onUpdated() {
+ try {
+ Object v = computing.apply(entity.getAttribute(sourceSensor));
+ if (v == null && !Boolean.FALSE.equals(removingIfResultIsNull)) {
+ v = Entities.REMOVE;
+ }
+ if (v == Entities.UNCHANGED) {
+ // nothing
+ } else {
+ // TODO check synchronization
+ TKey key = this.key;
+ if (key==null) key = (TKey) sourceSensor.getName();
+
+ Map<TKey, TVal> map = entity.getAttribute(targetSensor);
+
+ boolean created = (map==null);
+ if (created) map = MutableMap.of();
+
+ boolean changed;
+ if (v == Entities.REMOVE) {
+ changed = map.containsKey(key);
+ if (changed)
+ map.remove(key);
+ } else {
+ TVal oldV = map.get(key);
+ if (oldV==null)
+ changed = (v!=null || !map.containsKey(key));
+ else
+ changed = !oldV.equals(v);
+ if (changed)
+ map.put(key, (TVal)v);
+ }
+ if (changed || created)
+ emit(targetSensor, map);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error calculating map update for enricher "+this, t);
+ throw Exceptions.propagate(t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/test/java/brooklyn/enricher/EnrichersTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/enricher/EnrichersTest.java b/core/src/test/java/brooklyn/enricher/EnrichersTest.java
index 8dc9615..8bfd2bb 100644
--- a/core/src/test/java/brooklyn/enricher/EnrichersTest.java
+++ b/core/src/test/java/brooklyn/enricher/EnrichersTest.java
@@ -19,6 +19,7 @@
package brooklyn.enricher;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import org.testng.annotations.BeforeMethod;
@@ -33,8 +34,9 @@ import brooklyn.event.SensorEvent;
import brooklyn.event.basic.Sensors;
import brooklyn.test.EntityTestUtils;
import brooklyn.test.entity.TestEntity;
+import brooklyn.util.collections.MutableMap;
import brooklyn.util.collections.MutableSet;
-import brooklyn.util.guava.TypeTokens;
+import brooklyn.util.guava.Functionals;
import brooklyn.util.text.StringFunctions;
import com.google.common.base.Function;
@@ -45,6 +47,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
+@SuppressWarnings("serial")
public class EnrichersTest extends BrooklynAppUnitTestSupport {
public static final AttributeSensor<Integer> NUM1 = Sensors.newIntegerSensor("test.num1");
@@ -54,6 +57,9 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2");
public static final AttributeSensor<Set<Object>> SET1 = Sensors.newSensor(new TypeToken<Set<Object>>() {}, "test.set1", "set1 descr");
public static final AttributeSensor<Long> LONG1 = Sensors.newLongSensor("test.long1");
+ public static final AttributeSensor<Map<String,String>> MAP1 = Sensors.newSensor(new TypeToken<Map<String,String>>() {}, "test.map1", "map1 descr");
+ @SuppressWarnings("rawtypes")
+ public static final AttributeSensor<Map> MAP2 = Sensors.newSensor(Map.class, "test.map2");
private TestEntity entity;
private TestEntity entity2;
@@ -68,6 +74,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
group = app.createAndManageChild(EntitySpec.create(BasicGroup.class));
}
+ @SuppressWarnings("unchecked")
@Test
public void testAdding() {
entity.addEnricher(Enrichers.builder()
@@ -81,6 +88,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
EntityTestUtils.assertAttributeEqualsEventually(entity, NUM3, 5);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCombiningWithCustomFunction() {
entity.addEnricher(Enrichers.builder()
@@ -94,6 +102,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
EntityTestUtils.assertAttributeEqualsEventually(entity, NUM3, 1);
}
+ @SuppressWarnings("unchecked")
@Test(groups="Integration") // because takes a second
public void testCombiningRespectsUnchanged() {
entity.addEnricher(Enrichers.builder()
@@ -147,7 +156,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
entity.addEnricher(Enrichers.builder()
.transforming(NUM1)
.publishing(LONG1)
- .computing((Function)Functions.constant(Integer.valueOf(1)))
+ .computing(Functions.constant(Integer.valueOf(1)))
.build());
entity.setAttribute(NUM1, 123);
@@ -312,7 +321,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
.aggregating(NUM1)
.publishing(LONG1)
.fromMembers()
- .computing((Function)Functions.constant(Integer.valueOf(1)))
+ .computing(Functions.constant(Integer.valueOf(1)))
.build());
entity.setAttribute(NUM1, 123);
@@ -342,4 +351,39 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport {
entity.setAttribute(NUM1, 987654);
EntityTestUtils.assertAttributeEqualsContinually(group, LONG1, Long.valueOf(123));
}
+ @Test
+ public void testUpdatingMap1() {
+ entity.addEnricher(Enrichers.builder()
+ .updatingMap(MAP1)
+ .from(LONG1)
+ .computing(Functionals.when(-1L).value("-1 is not allowed"))
+ .build());
+
+ doUpdatingMapChecks(MAP1);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testUpdatingMap2() {
+ entity.addEnricher(Enrichers.builder()
+ .updatingMap((AttributeSensor)MAP2)
+ .from(LONG1)
+ .computing(Functionals.when(-1L).value("-1 is not allowed"))
+ .build());
+
+ doUpdatingMapChecks(MAP2);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected void doUpdatingMapChecks(AttributeSensor mapSensor) {
+ EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of());
+
+ entity.setAttribute(LONG1, -1L);
+ EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of(
+ LONG1.getName(), "-1 is not allowed"));
+
+ entity.setAttribute(LONG1, 1L);
+ EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java b/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java
index fb872b0..56bb319 100644
--- a/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java
@@ -51,13 +51,13 @@ import brooklyn.util.collections.MutableMap;
import brooklyn.util.collections.MutableSet;
import brooklyn.util.config.ConfigBag;
import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Functionals;
import brooklyn.util.task.DynamicTasks;
import brooklyn.util.task.Tasks;
import brooklyn.util.time.CountdownTimer;
import brooklyn.util.time.Duration;
import brooklyn.util.time.Time;
-import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -163,25 +163,11 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft
}))
.build();
- // FIXME quick-and-dirty change
- Function<Boolean, Map<String,Object>> f = new Function<Boolean, Map<String,Object>>() {
- @Override
- public Map<String, Object> apply(Boolean input) {
- Map<String, Object> result = getAttribute(Attributes.SERVICE_NOT_UP_INDICATORS);
- if (result==null) result = MutableMap.of();
- // TODO only change/publish if it needs changing...
- if (Boolean.TRUE.equals(input)) {
- result.remove(SERVICE_PROCESS_IS_RUNNING.getName());
- return result;
- } else {
- result.put(SERVICE_PROCESS_IS_RUNNING.getName(), "Process not running (according to driver checkRunning)");
- return result;
- }
- }
- };
- addEnricher(Enrichers.builder().transforming(SERVICE_PROCESS_IS_RUNNING).publishing(Attributes.SERVICE_NOT_UP_INDICATORS)
- .computing(f).build());
-
+ addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
+ .from(SERVICE_PROCESS_IS_RUNNING)
+ .computing(Functionals.when(false).value("Process not running (according to driver checkRunning)"))
+ .build());
+
// FIXME lives elsewhere
addEnricher(Enrichers.builder().transforming(Attributes.SERVICE_NOT_UP_INDICATORS).publishing(Attributes.SERVICE_UP)
.computing( Functions.forPredicate(CollectionFunctionals.<String>mapSizeEquals(0)) ).build());