You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/10/09 10:37:13 UTC
[2/8] incubator-brooklyn git commit: add generic type reducer and
tests
add generic type reducer and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f0ca5748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f0ca5748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f0ca5748
Branch: refs/heads/master
Commit: f0ca574819a66cb4799eb794f63929e174e44f95
Parents: 7e39e46
Author: Robert Moss <ro...@gmail.com>
Authored: Wed Sep 23 12:24:24 2015 +0100
Committer: Robert Moss <ro...@gmail.com>
Committed: Thu Oct 1 16:37:52 2015 +0100
----------------------------------------------------------------------
.../brooklyn/enricher/stock/Enrichers.java | 50 ++++--
.../apache/brooklyn/enricher/stock/Reducer.java | 113 ++++++++++---
.../brooklyn/enricher/stock/ReducerTest.java | 163 +++++++++++++++++--
3 files changed, 274 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
index 4273dec..25d186f 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java
@@ -48,6 +48,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -171,8 +172,8 @@ public class Enrichers {
public JoinerBuilder joining(AttributeSensor<?> source) {
return new JoinerBuilder(source);
}
- public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) {
- return new ReducerBuilder(sourceSensors);
+ public <S, T> ReducerBuilder<S, T> reducing(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> sourceSensors) {
+ return new ReducerBuilder<S, T>(clazz, sourceSensors);
}
}
@@ -680,21 +681,22 @@ public class Enrichers {
}
}
- protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> {
- protected AttributeSensor<S> publishing;
+ protected abstract static class AbstractReducerBuilder<S, T, B extends AbstractReducerBuilder<S, T, B>> extends AbstractEnricherBuilder<B> {
+ protected AttributeSensor<T> publishing;
protected Entity fromEntity;
- protected List<AttributeSensor<?>> reducing;
- protected Function<List<AttributeSensor<?>>, String> computing;
+ protected List<AttributeSensor<S>> reducing;
+ protected Function<List<S>, T> computing;
+ protected String functionName;
+ private Map<String, Object> parameters;
- public AbstractReducerBuilder(List<AttributeSensor<?>> val) {
- super(Reducer.class);
+ public AbstractReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) {
+ super(clazz);
this.reducing = checkNotNull(val);
}
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) {
- this.publishing = (AttributeSensor) checkNotNull(val);
- return (ReducerBuilder) this;
+
+ public B publishing(AttributeSensor<T> val) {
+ this.publishing = checkNotNull(val);
+ return self();
}
public B from(Entity val) {
@@ -702,17 +704,29 @@ public class Enrichers {
return self();
}
- public B computing(Function<List<AttributeSensor<?>>, String> val) {
+ public B computing(Function<List<S>, T> val) {
this.computing = checkNotNull(val);
return self();
}
+
+ public B computing(String functionName) {
+ return computing(functionName, ImmutableMap.<String, Object>of());
+ }
+
+ public B computing(String functionName, Map<String, Object> parameters) {
+ this.functionName = functionName;
+ this.parameters = parameters;
+ return self();
+ }
public EnricherSpec<?> build() {
return super.build().configure(MutableMap.builder()
.put(Reducer.SOURCE_SENSORS, reducing)
.put(Reducer.PRODUCER, fromEntity)
.put(Reducer.TARGET_SENSOR, publishing)
- .put(Reducer.REDUCER_FUNCTION, computing)
+ .putIfNotNull(Reducer.REDUCER_FUNCTION, computing)
+ .putIfNotNull(Reducer.REDUCER_FUNCTION_UNTYPED, functionName)
+ .putIfNotNull(Reducer.PARAMETERS, parameters)
.build()
);
}
@@ -775,9 +789,9 @@ public class Enrichers {
}
}
- public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> {
- public ReducerBuilder(List<AttributeSensor<?>> val) {
- super(val);
+ public static class ReducerBuilder<S, T> extends AbstractReducerBuilder<S, T, ReducerBuilder<S, T>> {
+ public ReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) {
+ super(clazz, val);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
index e22fcff..938cf22 100644
--- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java
@@ -1,6 +1,8 @@
package org.apache.brooklyn.enricher.stock;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
@@ -23,59 +25,76 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.reflect.TypeToken;
-public class Reducer extends AbstractEnricher implements SensorEventListener<Object> {
+@SuppressWarnings("serial")
+public abstract class Reducer<S, T> extends AbstractEnricher implements SensorEventListener<Object> {
private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
@SetFromFlag("producer")
public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
-
public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors");
public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction");
-
+ @SetFromFlag("transformation")
+ public static final ConfigKey<String> REDUCER_FUNCTION_UNTYPED = ConfigKeys.newStringConfigKey("enricher.reducerFunction.untyped",
+ "A string matching a pre-defined named reducer function, such as join");
+ public static final ConfigKey<Map<String, Object>> PARAMETERS = ConfigKeys.newConfigKey(new TypeToken<Map<String, Object>>() {}, "enricher.reducerFunctionParameters",
+ "A map of parameters to pass into the reducer function");
+
protected Entity producer;
+ protected List<AttributeSensor<S>> subscribedSensors;
+ protected Sensor<T> targetSensor;
+ protected Function<List<S>, T> reducerFunction;
- protected List<AttributeSensor<?>> subscribedSensors;
- protected Sensor<?> targetSensor;
-
- protected Function<List<?>, ?> reducerFunction;
-
-
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void setEntity(EntityLocal entity) {
super.setEntity(entity);
Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors");
this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER);
- List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
+ List<AttributeSensor<S>> sensorListTemp = Lists.newArrayList();
for (Object sensorO : getConfig(SOURCE_SENSORS)) {
- AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ AttributeSensor<S> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
if(!sensorListTemp.contains(sensor)) {
sensorListTemp.add(sensor);
}
}
+
+ String reducerName = config().get(REDUCER_FUNCTION_UNTYPED);
+ Function<List<S>, T> reducerFunction = (Function) config().get(REDUCER_FUNCTION);
+ if(reducerFunction == null){
+ Map<String, ?> parameters = config().get(PARAMETERS);
+ reducerFunction = createReducerFunction(reducerName, parameters);
+ }
- reducerFunction = config().get(REDUCER_FUNCTION);
+ this.reducerFunction = reducerFunction;
Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce");
- for (Sensor<?> sensor : sensorListTemp) {
+ for (Sensor<S> sensor : sensorListTemp) {
subscribe(producer, sensor, this);
}
subscribedSensors = ImmutableList.copyOf(sensorListTemp);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected abstract Function<List<S>, T> createReducerFunction(String reducerName, Map<String, ?> parameters);
+
+ @SuppressWarnings("unchecked")
@Override
public void onEvent(SensorEvent<Object> event) {
- Sensor<?> destinationSensor = getConfig(TARGET_SENSOR);
+ Sensor<T> destinationSensor = (Sensor<T>) getConfig(TARGET_SENSOR);
- List<Object> values = Lists.newArrayList();
+ List<S> values = Lists.newArrayList();
+
+ for (AttributeSensor<S> sourceSensor : subscribedSensors) {
+ S resolvedSensorValue = entity.sensors().get(sourceSensor);
+ if (resolvedSensorValue == null) {
+ // only apply function if all values are resolved
+ return;
+ }
- for (AttributeSensor<?> sourceSensor : subscribedSensors) {
- Object resolvedSensorValue = entity.sensors().get(sourceSensor);
values.add(resolvedSensorValue);
}
@@ -84,6 +103,60 @@ public class Reducer extends AbstractEnricher implements SensorEventListener<Obj
if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}",
new Object[] {this, event, entity, reducerFunction, destinationSensor});
- emit((Sensor)destinationSensor, result);
+ emit((Sensor<T>)destinationSensor, result);
+ }
+
+ public static class StringStringReducer extends Reducer<String, String> {
+
+ public StringStringReducer() {}
+
+ @Override
+ protected Function<List<String>, String> createReducerFunction(
+ String reducerName, Map<String, ?> parameters) {
+ if(reducerName.equals("joiner")){
+ return new JoinerFunction(parameters.get("separator"));
+ }
+ throw new IllegalStateException("unknown function: " + reducerName);
+ }
+ }
+
+ public static class JoinerReducerFunction<A> implements Function<List<A>, String> {
+
+ private Object separator;
+
+ public JoinerReducerFunction(Object separator) {
+ this.separator = (separator == null) ? ", " : separator;
+ }
+
+ @Override
+ public String apply(List<A> input) {
+
+ StringBuilder sb = new StringBuilder();
+ Iterator<A> it = input.iterator();
+ while(it.hasNext()) {
+ sb.append(it.next().toString());
+ if(it.hasNext()){
+ sb.append(separator);
+ }
+ }
+ return sb.toString();
+ }
+
+ }
+
+ public static class JoinerFunction extends JoinerReducerFunction<String>{
+
+ public JoinerFunction(Object separator) {
+ super(separator);
+ }
+ }
+
+ public static class ToStringReducerFunction<A> implements Function<List<A>, String> {
+
+ @Override
+ public String apply(List<A> input) {
+ return input.toString();
+ }
+
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
index 7a350eb..a39d76c 100644
--- a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java
@@ -1,6 +1,7 @@
package org.apache.brooklyn.enricher.stock;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
@@ -10,18 +11,25 @@ import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.enricher.stock.Reducer.StringStringReducer;
+import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
public class ReducerTest extends BrooklynAppUnitTestSupport {
public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1");
public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2");
public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3");
+ public static final AttributeSensor<Integer> INT1 = Sensors.newIntegerSensor("test.int1");
private TestEntity entity;
@@ -33,13 +41,18 @@ public class ReducerTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testBasicReducer() {
- entity.addEnricher(EnricherSpec.create(Reducer.class)
- .configure(Reducer.PRODUCER, entity)
- .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2))
- .configure(Reducer.TARGET_SENSOR, STR3)
- .configure(Reducer.REDUCER_FUNCTION, new Concatenator())
+ public void testBasicReducer(){
+ entity.addEnricher(EnricherSpec.create(StringStringReducer.class).configure(
+ MutableMap.of(
+ Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2),
+ Reducer.PRODUCER, entity,
+ Reducer.TARGET_SENSOR, STR3,
+ Reducer.REDUCER_FUNCTION, new Concatenator())
+ )
);
+
+ EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+
entity.sensors().set(STR1, "foo");
EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
@@ -48,34 +61,156 @@ public class ReducerTest extends BrooklynAppUnitTestSupport {
}
@Test
- public void testReducingBuilder() {
- entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2))
+ public void testReducingBuilderWithConcatenator() {
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
.from(entity)
.computing(new Concatenator())
.publishing(STR3)
.build()
);
+ EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+
entity.sensors().set(STR1, "foo");
EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
entity.sensors().set(STR2, "bar");
EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar");
}
+
+ @Test
+ public void testReducingBuilderWithLengthCalculator() {
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringIntegerReducer.class, ImmutableList.of(STR1, STR2))
+ .from(entity)
+ .computing(new LengthCalculator())
+ .publishing(INT1)
+ .build()
+ );
+
+ EntityTestUtils.assertAttributeEquals(entity, INT1, null);
+
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, INT1, null);
+
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, INT1, 6);
+ }
+
+ @Test
+ public void testReducingBuilderWithJoinerFunction() {
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+ .from(entity)
+ .computing("joiner", ImmutableMap.<String, Object>of("separator", "-"))
+ .publishing(STR3)
+ .build()
+ );
+
+ EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo-bar");
+ }
+
+ @Test
+ public void testReducingBuilderWithJoinerFunctionWithDefaultParameter() {
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+ .from(entity)
+ .computing("joiner")
+ .publishing(STR3)
+ .build()
+ );
+ EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
+
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar");
+ }
+
+ @Test
+ public void testReducingBuilderWithJoinerFunctionAndUnusedParameter() {
+
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+ .from(entity)
+ .computing("joiner", ImmutableMap.<String, Object>of("non.existent.parameter", "-"))
+ .publishing(STR3)
+ .build()
+ );
+ EntityTestUtils.assertAttributeEquals(entity, STR3, null);
+
+ entity.sensors().set(STR1, "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null);
- private class Concatenator implements Function<List<?>, String> {
+ entity.sensors().set(STR2, "bar");
+ EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar");
+ }
+
+ @Test
+ public void testReducingBuilderWithNamedNonExistentFunction() {
+ try {
+ entity.addEnricher(Enrichers.builder()
+ .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2))
+ .from(entity)
+ .computing("unknown function name", ImmutableMap.<String, Object>of("separator", "-"))
+ .publishing(STR3)
+ .build()
+ );
+ Asserts.fail("Expected exception when adding reducing enricher with unknown named function");
+ } catch (Exception e) {
+ Throwable t = Exceptions.getFirstThrowableOfType(e, IllegalStateException.class);
+ Assert.assertNotNull(t);
+ }
+ }
+
+ private static class Concatenator implements Function<List<String>, String> {
@Nullable
@Override
- public String apply(List<?> values) {
- String result = "";
- for (Object value : values) {
+ public String apply(List<String> values) {
+ StringBuilder result = new StringBuilder();
+ for (String value : values) {
if (value == null) {
return null;
} else {
- result += String.valueOf(value);
+ result.append(value);
+ }
+ }
+ return result.toString();
+ }
+ }
+
+ public static class StringIntegerReducer extends Reducer<String, Integer> {
+
+ public StringIntegerReducer() {}
+
+ @Override
+ protected Function<List<String>, Integer> createReducerFunction(
+ String reducerName, Map<String, ?> parameters) {
+ throw new IllegalStateException("unknown function: " + reducerName);
+ }
+
+ }
+
+ private static class LengthCalculator implements Function<List<String>, Integer>{
+
+ @Override
+ public Integer apply(List<String> values) {
+ int acc = 0;
+ for (String value : values) {
+ if (value != null) {
+ acc += value.length();
}
}
- return result;
+ return acc;
}
+
}
}