You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/10/09 10:37:13 UTC

[2/8] incubator-brooklyn git commit: add generic type reducer and tests

add generic type reducer and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f0ca5748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f0ca5748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f0ca5748

Branch: refs/heads/master
Commit: f0ca574819a66cb4799eb794f63929e174e44f95
Parents: 7e39e46
Author: Robert Moss <ro...@gmail.com>
Authored: Wed Sep 23 12:24:24 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Thu Oct 1 16:37:52 2015 +0100

----------------------------------------------------------------------
 .../brooklyn/enricher/stock/Enrichers.java      |  50 ++++--
 .../apache/brooklyn/enricher/stock/Reducer.java | 113 ++++++++++---
 .../brooklyn/enricher/stock/ReducerTest.java    | 163 +++++++++++++++++--
 3 files changed, 274 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
index 4273dec..25d186f 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
@@ -48,6 +48,7 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -171,8 +172,8 @@ public class Enrichers {
         public JoinerBuilder joining(AttributeSensor<?> source) {
             return new JoinerBuilder(source);
         }
-        public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) {
-            return new ReducerBuilder(sourceSensors);
+        public <S, T> ReducerBuilder<S, T> reducing(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> sourceSensors) {
+            return new ReducerBuilder<S, T>(clazz, sourceSensors);
         }
     }
 
@@ -680,21 +681,22 @@ public class Enrichers {
         }
     }
 
-    protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> {
-        protected AttributeSensor<S> publishing;
+    protected abstract static class AbstractReducerBuilder<S, T, B extends AbstractReducerBuilder<S, T, B>> extends AbstractEnricherBuilder<B> {
+        protected AttributeSensor<T> publishing;
         protected Entity fromEntity;
-        protected List<AttributeSensor<?>> reducing;
-        protected Function<List<AttributeSensor<?>>, String> computing;
+        protected List<AttributeSensor<S>> reducing;
+        protected Function<List<S>, T> computing;
+        protected String functionName;
+        private Map<String, Object> parameters;
 
-        public AbstractReducerBuilder(List<AttributeSensor<?>> val) {
-            super(Reducer.class);
+        public AbstractReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) {
+            super(clazz);
             this.reducing = checkNotNull(val);
         }
-
-        @SuppressWarnings({ "unchecked", "rawtypes" })
-        public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) {
-            this.publishing = (AttributeSensor) checkNotNull(val);
-            return (ReducerBuilder) this;
+        
+        public B publishing(AttributeSensor<T> val) {
+            this.publishing =  checkNotNull(val);
+            return self();
         }
 
         public B from(Entity val) {
@@ -702,17 +704,29 @@ public class Enrichers {
             return self();
         }
 
-        public B computing(Function<List<AttributeSensor<?>>, String> val) {
+        public B computing(Function<List<S>, T> val) {
             this.computing = checkNotNull(val);
             return self();
         }
+        
+        public B computing(String functionName) {
+            return computing(functionName, ImmutableMap.<String, Object>of());
+        }
+        
+        public B computing(String functionName, Map<String, Object> parameters) {
+            this.functionName = functionName;
+            this.parameters = parameters;
+            return self();
+        }
 
         public EnricherSpec<?> build() {
             return super.build().configure(MutableMap.builder()
                     .put(Reducer.SOURCE_SENSORS, reducing)
                     .put(Reducer.PRODUCER, fromEntity)
                     .put(Reducer.TARGET_SENSOR, publishing)
-                    .put(Reducer.REDUCER_FUNCTION, computing)
+                    .putIfNotNull(Reducer.REDUCER_FUNCTION, computing)
+                    .putIfNotNull(Reducer.REDUCER_FUNCTION_UNTYPED, functionName)
+                    .putIfNotNull(Reducer.PARAMETERS, parameters)
                     .build()
             );
         }
@@ -775,9 +789,9 @@ public class Enrichers {
         }
     }
 
-    public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> {
-        public ReducerBuilder(List<AttributeSensor<?>> val) {
-            super(val);
+    public static class ReducerBuilder<S, T> extends AbstractReducerBuilder<S, T, ReducerBuilder<S, T>> {
+        public ReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) {
+            super(clazz, val);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
index e22fcff..938cf22 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
@@ -1,6 +1,8 @@
 package org.apache.brooklyn.enricher.stock;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
@@ -23,59 +25,76 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.reflect.TypeToken;
 
-public class Reducer extends AbstractEnricher implements SensorEventListener<Object> {
+@SuppressWarnings("serial")
+public abstract class Reducer<S, T> extends AbstractEnricher implements SensorEventListener<Object> {
 
     private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
 
     @SetFromFlag("producer")
     public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
-
     public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
     public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors");
     public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction");
-
+    @SetFromFlag("transformation")
+    public static final ConfigKey<String> REDUCER_FUNCTION_UNTYPED = ConfigKeys.newStringConfigKey("enricher.reducerFunction.untyped",
+        "A string matching a pre-defined named reducer function, such as join");
+    public static final ConfigKey<Map<String, Object>> PARAMETERS = ConfigKeys.newConfigKey(new TypeToken<Map<String, Object>>() {}, "enricher.reducerFunctionParameters", 
+        "A map of parameters to pass into the reducer function");
+   
     protected Entity producer;
+    protected List<AttributeSensor<S>> subscribedSensors;
+    protected Sensor<T> targetSensor;
+    protected Function<List<S>, T> reducerFunction;
 
-    protected List<AttributeSensor<?>> subscribedSensors;
-    protected Sensor<?> targetSensor;
-
-    protected Function<List<?>, ?> reducerFunction;
-
-
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void setEntity(EntityLocal entity) {
         super.setEntity(entity);
         Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors");
 
         this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER);
-        List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
+        List<AttributeSensor<S>> sensorListTemp = Lists.newArrayList();
 
         for (Object sensorO : getConfig(SOURCE_SENSORS)) {
-            AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+            AttributeSensor<S> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
             if(!sensorListTemp.contains(sensor)) {
                 sensorListTemp.add(sensor);
             }
         }
+        
+        String reducerName = config().get(REDUCER_FUNCTION_UNTYPED);
+        Function<List<S>, T> reducerFunction = (Function) config().get(REDUCER_FUNCTION);
+        if(reducerFunction == null){
+            Map<String, ?> parameters = config().get(PARAMETERS);
+            reducerFunction = createReducerFunction(reducerName, parameters);
+        }
 
-        reducerFunction = config().get(REDUCER_FUNCTION);
+        this.reducerFunction = reducerFunction;
         Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce");
 
-        for (Sensor<?> sensor : sensorListTemp) {
+        for (Sensor<S> sensor : sensorListTemp) {
             subscribe(producer, sensor, this);
         }
 
         subscribedSensors = ImmutableList.copyOf(sensorListTemp);
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected abstract Function<List<S>, T> createReducerFunction(String reducerName, Map<String, ?> parameters);
+    
+    @SuppressWarnings("unchecked")
     @Override
     public void onEvent(SensorEvent<Object> event) {
-        Sensor<?> destinationSensor = getConfig(TARGET_SENSOR);
+        Sensor<T> destinationSensor = (Sensor<T>) getConfig(TARGET_SENSOR);
 
-        List<Object> values = Lists.newArrayList();
+        List<S> values = Lists.newArrayList();
+
+        for (AttributeSensor<S> sourceSensor : subscribedSensors) {
+            S resolvedSensorValue = entity.sensors().get(sourceSensor);
+            if (resolvedSensorValue == null) {
+                // only apply function if all values are resolved
+                return;
+            }
 
-        for (AttributeSensor<?> sourceSensor : subscribedSensors) {
-            Object resolvedSensorValue = entity.sensors().get(sourceSensor);
             values.add(resolvedSensorValue);
         }
 
@@ -84,6 +103,60 @@ public class Reducer extends AbstractEnricher implements SensorEventListener<Obj
         if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}",
                 new Object[] {this, event, entity, reducerFunction, destinationSensor});
 
-        emit((Sensor)destinationSensor, result);
+        emit((Sensor<T>)destinationSensor, result);
+    }
+    
+    public static class StringStringReducer extends Reducer<String, String> {
+        
+        public StringStringReducer() {}
+
+        @Override
+        protected Function<List<String>, String> createReducerFunction(
+                String reducerName, Map<String, ?> parameters) {
+            if(reducerName.equals("joiner")){
+                return new JoinerFunction(parameters.get("separator"));
+            }
+            throw new IllegalStateException("unknown function: " + reducerName);
+        }
+    }
+
+    public static class JoinerReducerFunction<A> implements Function<List<A>, String> {
+        
+        private Object separator;
+
+        public JoinerReducerFunction(Object separator) {
+            this.separator = (separator == null) ? ", " : separator;
+        }
+
+        @Override
+        public String apply(List<A> input) {
+            
+            StringBuilder sb = new StringBuilder();
+            Iterator<A> it = input.iterator();
+            while(it.hasNext()) {
+                sb.append(it.next().toString());
+                if(it.hasNext()){
+                    sb.append(separator);
+                }
+            }
+            return sb.toString();
+        }
+        
+    }
+
+    public static class JoinerFunction extends JoinerReducerFunction<String>{
+        
+        public JoinerFunction(Object separator) {
+            super(separator);
+        }
+    }
+
+    public static class ToStringReducerFunction<A> implements Function<List<A>, String> {
+
+        @Override
+        public String apply(List<A> input) {
+            return input.toString();
+        }
+        
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
index 7a350eb..a39d76c 100644
--- a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
@@ -1,6 +1,7 @@
 package org.apache.brooklyn.enricher.stock;
 
 import java.util.List;
+import java.util.Map;
 
 import javax.annotation.Nullable;
 
@@ -10,18 +11,25 @@ import org.apache.brooklyn.api.sensor.EnricherSpec;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.enricher.stock.Reducer.StringStringReducer;
+import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class ReducerTest extends BrooklynAppUnitTestSupport {
 
     public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1");
     public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2");
     public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3");
+    public static final AttributeSensor<Integer> INT1 = Sensors.newIntegerSensor("test.int1");
 
     private TestEntity entity;
 
@@ -33,13 +41,18 @@ public class ReducerTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
-    public void testBasicReducer() {
-        entity.addEnricher(EnricherSpec.create(Reducer.class)
-            .configure(Reducer.PRODUCER, entity)
-            .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2))
-            .configure(Reducer.TARGET_SENSOR, STR3)
-            .configure(Reducer.REDUCER_FUNCTION, new Concatenator())
+    public void testBasicReducer(){
+        entity.addEnricher(EnricherSpec.create(StringStringReducer.class).configure(
+                MutableMap.of(
+                Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2),
+                Reducer.PRODUCER, entity,
+                Reducer.TARGET_SENSOR, STR3,
+                Reducer.REDUCER_FUNCTION, new Concatenator())
+            )
         );
+
+        EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+        
         entity.sensors().set(STR1, "foo");
         EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
 
@@ -48,34 +61,156 @@ public class ReducerTest extends BrooklynAppUnitTestSupport {
     }
 
     @Test
-    public void testReducingBuilder() {
-        entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2))
+    public void testReducingBuilderWithConcatenator() {
+        entity.addEnricher(Enrichers.builder()
+                .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
                 .from(entity)
                 .computing(new Concatenator())
                 .publishing(STR3)
                 .build()
         );
 
+        EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+        
         entity.sensors().set(STR1, "foo");
         EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
 
         entity.sensors().set(STR2, "bar");
         EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
     }
+    
+    @Test
+    public void testReducingBuilderWithLengthCalculator() {
+        entity.addEnricher(Enrichers.builder()
+                .reducing(StringIntegerReducer.class, ImmutableList.of(STR1, STR2))
+                .from(entity)
+                .computing(new LengthCalculator())
+                .publishing(INT1)
+                .build()
+        );
+
+        EntityTestUtils.assertAttributeEquals(entity, INT1, null);
+        
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, INT1, null);
+
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, INT1, 6);
+    }
+    
+    @Test
+    public void testReducingBuilderWithJoinerFunction() {
+        entity.addEnricher(Enrichers.builder()
+                .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+                .from(entity)
+                .computing("joiner", ImmutableMap.<String, Object>of("separator", "-"))
+                .publishing(STR3)
+                .build()
+        );
+
+        EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+        
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo-bar");
+    }
+    
+    @Test
+    public void testReducingBuilderWithJoinerFunctionWithDefaultParameter() {
+        entity.addEnricher(Enrichers.builder()
+            .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+            .from(entity)
+            .computing("joiner")
+            .publishing(STR3)
+            .build()
+        );
+        EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+        
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar");
+    }
+    
+    @Test
+    public void testReducingBuilderWithJoinerFunctionAndUnusedParameter() {
+        
+        entity.addEnricher(Enrichers.builder()
+            .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+            .from(entity)
+            .computing("joiner", ImmutableMap.<String, Object>of("non.existent.parameter", "-"))
+            .publishing(STR3)
+            .build()
+        );
+        EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+        
+        entity.sensors().set(STR1, "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
 
-    private class Concatenator implements Function<List<?>, String> {
+        entity.sensors().set(STR2, "bar");
+        EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar");
+    }
+    
+    @Test
+    public void testReducingBuilderWithNamedNonExistentFunction() {
+        try { 
+            entity.addEnricher(Enrichers.builder()
+                .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+                .from(entity)
+                .computing("unknown function name", ImmutableMap.<String, Object>of("separator", "-"))
+                .publishing(STR3)
+                .build()
+            );
+            Asserts.fail("Expected exception when adding reducing enricher with unknown named function");
+        } catch (Exception e) {
+            Throwable t = Exceptions.getFirstThrowableOfType(e, IllegalStateException.class);
+            Assert.assertNotNull(t);
+        }
+    }
+    
+    private static class Concatenator implements Function<List<String>, String> {
         @Nullable
         @Override
-        public String apply(List<?> values) {
-            String result = "";
-            for (Object value : values) {
+        public String apply(List<String> values) {
+            StringBuilder result = new StringBuilder();
+            for (String value : values) {
                 if (value == null) {
                     return null;
                 } else {
-                    result += String.valueOf(value);
+                    result.append(value);
+                }
+            }
+            return result.toString();
+        }
+    }
+    
+    public static class StringIntegerReducer extends Reducer<String, Integer> {
+        
+        public StringIntegerReducer() {}
+
+        @Override
+        protected Function<List<String>, Integer> createReducerFunction(
+                String reducerName, Map<String, ?> parameters) {
+            throw new IllegalStateException("unknown function: " + reducerName);
+        }
+        
+    }
+    
+    private static class LengthCalculator implements Function<List<String>, Integer>{
+
+        @Override
+        public Integer apply(List<String> values) {
+            int acc = 0;
+            for (String value : values) {
+                if (value != null) {
+                    acc += value.length();
                 }
             }
-            return result;
+            return acc;
         }
+        
     }
 }