You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/10/09 10:37:14 UTC

[3/8] incubator-brooklyn git commit: Adds reducer enricher

Adds reducer enricher


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7e39e466
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7e39e466
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7e39e466

Branch: refs/heads/master
Commit: 7e39e46686a4206e69a85756fb1c18f83a7c5b41
Parents: 0c85cd9
Author: Martin Harris <gi...@nakomis.com>
Authored: Tue Sep 22 14:56:02 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Thu Oct 1 16:37:52 2015 +0100

----------------------------------------------------------------------
 .../brooklyn/enricher/stock/Enrichers.java      | 52 ++++++++++++
 .../apache/brooklyn/enricher/stock/Reducer.java | 89 ++++++++++++++++++++
 .../brooklyn/enricher/stock/Transformer.java    |  2 +
 .../brooklyn/enricher/stock/ReducerTest.java    | 81 ++++++++++++++++++
 4 files changed, 224 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
index baed4f1..4273dec 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
@@ -171,6 +171,9 @@ public class Enrichers {
         public JoinerBuilder joining(AttributeSensor<?> source) {
             return new JoinerBuilder(source);
         }
+        public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) {
+            return new ReducerBuilder(sourceSensors);
+        }
     }
 
 
@@ -676,6 +679,49 @@ public class Enrichers {
                     .toString();
         }
     }
+
+    protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> {
+        protected AttributeSensor<S> publishing;
+        protected Entity fromEntity;
+        protected List<AttributeSensor<?>> reducing;
+        protected Function<List<AttributeSensor<?>>, String> computing;
+
+        public AbstractReducerBuilder(List<AttributeSensor<?>> val) {
+            super(Reducer.class);
+            this.reducing = checkNotNull(val);
+        }
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) {
+            this.publishing = (AttributeSensor) checkNotNull(val);
+            return (ReducerBuilder) this;
+        }
+
+        public B from(Entity val) {
+            this.fromEntity = checkNotNull(val);
+            return self();
+        }
+
+        public B computing(Function<List<AttributeSensor<?>>, String> val) {
+            this.computing = checkNotNull(val);
+            return self();
+        }
+
+        public EnricherSpec<?> build() {
+            return super.build().configure(MutableMap.builder()
+                    .put(Reducer.SOURCE_SENSORS, reducing)
+                    .put(Reducer.PRODUCER, fromEntity)
+                    .put(Reducer.TARGET_SENSOR, publishing)
+                    .put(Reducer.REDUCER_FUNCTION, computing)
+                    .build()
+            );
+        }
+
+        @Override
+        protected String getDefaultUniqueTag() {
+            return "reducer:" + reducing.toString();
+        }
+    }
     
     public static class InitialBuilder extends AbstractInitialBuilder<InitialBuilder> {
     }
@@ -729,6 +775,12 @@ public class Enrichers {
         }
     }
 
+    public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> {
+        public ReducerBuilder(List<AttributeSensor<?>> val) {
+            super(val);
+        }
+    }
+
     @Beta
     private abstract static class ComputingNumber<T extends Number> implements Function<Collection<T>, T> {
         protected final Number defaultValueForUnreportedSensors;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
new file mode 100644
index 0000000..e22fcff
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
@@ -0,0 +1,89 @@
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.ValueResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.reflect.TypeToken;
+
+public class Reducer extends AbstractEnricher implements SensorEventListener<Object> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
+
+    @SetFromFlag("producer")
+    public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
+
+    public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+    public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors");
+    public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction");
+
+    protected Entity producer;
+
+    protected List<AttributeSensor<?>> subscribedSensors;
+    protected Sensor<?> targetSensor;
+
+    protected Function<List<?>, ?> reducerFunction;
+
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors");
+
+        this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER);
+        List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
+
+        for (Object sensorO : getConfig(SOURCE_SENSORS)) {
+            AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+            if(!sensorListTemp.contains(sensor)) {
+                sensorListTemp.add(sensor);
+            }
+        }
+
+        reducerFunction = config().get(REDUCER_FUNCTION);
+        Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce");
+
+        for (Sensor<?> sensor : sensorListTemp) {
+            subscribe(producer, sensor, this);
+        }
+
+        subscribedSensors = ImmutableList.copyOf(sensorListTemp);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void onEvent(SensorEvent<Object> event) {
+        Sensor<?> destinationSensor = getConfig(TARGET_SENSOR);
+
+        List<Object> values = Lists.newArrayList();
+
+        for (AttributeSensor<?> sourceSensor : subscribedSensors) {
+            Object resolvedSensorValue = entity.sensors().get(sourceSensor);
+            values.add(resolvedSensorValue);
+        }
+
+        Object result = reducerFunction.apply(values);
+
+        if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}",
+                new Object[] {this, event, entity, reducerFunction, destinationSensor});
+
+        emit((Sensor)destinationSensor, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
index ef23ab4..f15b2b2 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.task.ValueResolver;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
new file mode 100644
index 0000000..7a350eb
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
@@ -0,0 +1,81 @@
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+public class ReducerTest extends BrooklynAppUnitTestSupport {
+
+    public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1");
+    public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2");
+    public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3");
+
+    private TestEntity entity;
+
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+    }
+
+    @Test
+    public void testBasicReducer() {
+        entity.addEnricher(EnricherSpec.create(Reducer.class)
+            .configure(Reducer.PRODUCER, entity)
+            .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2))
+            .configure(Reducer.TARGET_SENSOR, STR3)
+            .configure(Reducer.REDUCER_FUNCTION, new Concatenator())
+        );
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
+    }
+
+    @Test
+    public void testReducingBuilder() {
+        entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2))
+                .from(entity)
+                .computing(new Concatenator())
+                .publishing(STR3)
+                .build()
+        );
+
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
+    }
+
+    private class Concatenator implements Function<List<?>, String> {
+        @Nullable
+        @Override
+        public String apply(List<?> values) {
+            String result = "";
+            for (Object value : values) {
+                if (value == null) {
+                    return null;
+                } else {
+                    result += String.valueOf(value);
+                }
+            }
+            return result;
+        }
+    }
+}