You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/10/09 10:37:14 UTC
[3/8] incubator-brooklyn git commit: Adds reducer enricher
Adds reducer enricher
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7e39e466
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7e39e466
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7e39e466
Branch: refs/heads/master
Commit: 7e39e46686a4206e69a85756fb1c18f83a7c5b41
Parents: 0c85cd9
Author: Martin Harris <gi...@nakomis.com>
Authored: Tue Sep 22 14:56:02 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Thu Oct 1 16:37:52 2015 +0100
----------------------------------------------------------------------
.../brooklyn/enricher/stock/Enrichers.java | 52 ++++++++++++
.../apache/brooklyn/enricher/stock/Reducer.java | 89 ++++++++++++++++++++
.../brooklyn/enricher/stock/Transformer.java | 2 +
.../brooklyn/enricher/stock/ReducerTest.java | 81 ++++++++++++++++++
4 files changed, 224 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
index baed4f1..4273dec 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
@@ -171,6 +171,9 @@ public class Enrichers {
public JoinerBuilder joining(AttributeSensor<?> source) {
return new JoinerBuilder(source);
}
+ public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) {
+ return new ReducerBuilder(sourceSensors);
+ }
}
@@ -676,6 +679,49 @@ public class Enrichers {
.toString();
}
}
+
+ protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> {
+ protected AttributeSensor<S> publishing;
+ protected Entity fromEntity;
+ protected List<AttributeSensor<?>> reducing;
+ protected Function<List<AttributeSensor<?>>, String> computing;
+
+ public AbstractReducerBuilder(List<AttributeSensor<?>> val) {
+ super(Reducer.class);
+ this.reducing = checkNotNull(val);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) {
+ this.publishing = (AttributeSensor) checkNotNull(val);
+ return (ReducerBuilder) this;
+ }
+
+ public B from(Entity val) {
+ this.fromEntity = checkNotNull(val);
+ return self();
+ }
+
+ public B computing(Function<List<AttributeSensor<?>>, String> val) {
+ this.computing = checkNotNull(val);
+ return self();
+ }
+
+ public EnricherSpec<?> build() {
+ return super.build().configure(MutableMap.builder()
+ .put(Reducer.SOURCE_SENSORS, reducing)
+ .put(Reducer.PRODUCER, fromEntity)
+ .put(Reducer.TARGET_SENSOR, publishing)
+ .put(Reducer.REDUCER_FUNCTION, computing)
+ .build()
+ );
+ }
+
+ @Override
+ protected String getDefaultUniqueTag() {
+ return "reducer:" + reducing.toString();
+ }
+ }
public static class InitialBuilder extends AbstractInitialBuilder<InitialBuilder> {
}
@@ -729,6 +775,12 @@ public class Enrichers {
}
}
+ public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> {
+ public ReducerBuilder(List<AttributeSensor<?>> val) {
+ super(val);
+ }
+ }
+
@Beta
private abstract static class ComputingNumber<T extends Number> implements Function<Collection<T>, T> {
protected final Number defaultValueForUnreportedSensors;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
new file mode 100644
index 0000000..e22fcff
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
@@ -0,0 +1,89 @@
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.ValueResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.reflect.TypeToken;
+
+public class Reducer extends AbstractEnricher implements SensorEventListener<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
+
+ @SetFromFlag("producer")
+ public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
+
+ public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+ public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors");
+ public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction");
+
+ protected Entity producer;
+
+ protected List<AttributeSensor<?>> subscribedSensors;
+ protected Sensor<?> targetSensor;
+
+ protected Function<List<?>, ?> reducerFunction;
+
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors");
+
+ this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER);
+ List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
+
+ for (Object sensorO : getConfig(SOURCE_SENSORS)) {
+ AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ if(!sensorListTemp.contains(sensor)) {
+ sensorListTemp.add(sensor);
+ }
+ }
+
+ reducerFunction = config().get(REDUCER_FUNCTION);
+ Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce");
+
+ for (Sensor<?> sensor : sensorListTemp) {
+ subscribe(producer, sensor, this);
+ }
+
+ subscribedSensors = ImmutableList.copyOf(sensorListTemp);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ Sensor<?> destinationSensor = getConfig(TARGET_SENSOR);
+
+ List<Object> values = Lists.newArrayList();
+
+ for (AttributeSensor<?> sourceSensor : subscribedSensors) {
+ Object resolvedSensorValue = entity.sensors().get(sourceSensor);
+ values.add(resolvedSensorValue);
+ }
+
+ Object result = reducerFunction.apply(values);
+
+ if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}",
+ new Object[] {this, event, entity, reducerFunction, destinationSensor});
+
+ emit((Sensor)destinationSensor, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
index ef23ab4..f15b2b2 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolver;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
new file mode 100644
index 0000000..7a350eb
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
@@ -0,0 +1,81 @@
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+public class ReducerTest extends BrooklynAppUnitTestSupport {
+
+ public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1");
+ public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2");
+ public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3");
+
+ private TestEntity entity;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ }
+
+ @Test
+ public void testBasicReducer() {
+ entity.addEnricher(EnricherSpec.create(Reducer.class)
+ .configure(Reducer.PRODUCER, entity)
+ .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2))
+ .configure(Reducer.TARGET_SENSOR, STR3)
+ .configure(Reducer.REDUCER_FUNCTION, new Concatenator())
+ );
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
+ }
+
+ @Test
+ public void testReducingBuilder() {
+ entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2))
+ .from(entity)
+ .computing(new Concatenator())
+ .publishing(STR3)
+ .build()
+ );
+
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
+ }
+
+ private class Concatenator implements Function<List<?>, String> {
+ @Nullable
+ @Override
+ public String apply(List<?> values) {
+ String result = "";
+ for (Object value : values) {
+ if (value == null) {
+ return null;
+ } else {
+ result += String.valueOf(value);
+ }
+ }
+ return result;
+ }
+ }
+}