You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:50:01 UTC
[04/50] [abbrv] brooklyn-server git commit: yaml-friendly enrichers
for delta and rolling avg
yaml-friendly enrichers for delta and rolling avg
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/1e69cc97
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/1e69cc97
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/1e69cc97
Branch: refs/heads/0.7.0-incubating
Commit: 1e69cc9754e22c49b502546b046380c153bd58e9
Parents: ea8da37
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Jun 19 04:56:39 2015 -0700
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jun 24 00:40:32 2015 -0700
----------------------------------------------------------------------
.../enricher/basic/AbstractTransformer.java | 106 +++++++++++
.../brooklyn/enricher/basic/Transformer.java | 64 +------
.../YamlRollingTimeWindowMeanEnricher.java | 178 +++++++++++++++++++
.../basic/YamlTimeWeightedDeltaEnricher.java | 81 +++++++++
.../YamlRollingTimeWindowMeanEnricherTest.java | 178 +++++++++++++++++++
.../YamlTimeWeightedDeltaEnricherTest.java | 107 +++++++++++
.../enricher/RollingTimeWindowMeanEnricher.java | 4 +-
.../enricher/TimeWeightedDeltaEnricher.java | 3 +
8 files changed, 658 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
new file mode 100644
index 0000000..28a6de1
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/AbstractTransformer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements SensorEventListener<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class);
+
+ public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
+
+ public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
+
+ public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+
+ protected Entity producer;
+ protected Sensor<T> sourceSensor;
+ protected Sensor<U> targetSensor;
+
+ public AbstractTransformer() {
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ Function<SensorEvent<T>, U> transformation = getTransformation();
+ this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER);
+ this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
+ Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR);
+ this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor;
+ if (producer.equals(entity) && targetSensorSpecified==null) {
+ LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+
+ producer+"."+sourceSensor+" (computing "+transformation+")");
+ // we don't throw because this error may manifest itself after a lengthy deployment,
+ // and failing it at that point simply because of an enricher is not very pleasant
+ // (at least not until we have good re-run support across the board)
+ return;
+ }
+
+ subscribe(producer, sourceSensor, this);
+
+ if (sourceSensor instanceof AttributeSensor) {
+ Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
+ // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
+ if (value!=null) {
+ onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
+ }
+ }
+ }
+
+ /** returns a function for transformation, for immediate use only (not for caching, as it may change) */
+ protected abstract Function<SensorEvent<T>, U> getTransformation();
+
+ @Override
+ public void onEvent(SensorEvent<T> event) {
+ emit(targetSensor, compute(event));
+ }
+
+ protected Object compute(SensorEvent<T> event) {
+ // transformation is not going to change, but this design makes it easier to support changing config in future.
+ // if it's an efficiency hole we can switch to populate the transformation at start.
+ U result = getTransformation().apply(event);
+ if (LOG.isTraceEnabled())
+ LOG.trace("Enricher "+this+" computed "+result+" from "+event);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/main/java/brooklyn/enricher/basic/Transformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/Transformer.java b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
index c6c88a6..2fa85fe 100644
--- a/core/src/main/java/brooklyn/enricher/basic/Transformer.java
+++ b/core/src/main/java/brooklyn/enricher/basic/Transformer.java
@@ -24,14 +24,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.config.ConfigKey;
-import brooklyn.entity.Entity;
import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityLocal;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.Sensor;
import brooklyn.event.SensorEvent;
-import brooklyn.event.SensorEventListener;
-import brooklyn.event.basic.BasicSensorEvent;
import brooklyn.util.collections.MutableSet;
import brooklyn.util.task.Tasks;
import brooklyn.util.time.Duration;
@@ -41,7 +35,7 @@ import com.google.common.reflect.TypeToken;
//@Catalog(name="Transformer", description="Transforms attributes of an entity; see Enrichers.builder().transforming(...)")
@SuppressWarnings("serial")
-public class Transformer<T,U> extends AbstractEnricher implements SensorEventListener<T> {
+public class Transformer<T,U> extends AbstractTransformer<T,U> {
private static final Logger LOG = LoggerFactory.getLogger(Transformer.class);
@@ -50,50 +44,11 @@ public class Transformer<T,U> extends AbstractEnricher implements SensorEventLis
public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_VALUE = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation");
public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_EVENT = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation.fromevent");
- public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
-
- public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
-
- public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
-
- protected Entity producer;
- protected Sensor<T> sourceSensor;
- protected Sensor<U> targetSensor;
-
public Transformer() {
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void setEntity(EntityLocal entity) {
- super.setEntity(entity);
-
- Function<SensorEvent<T>, U> transformation = getTransformation();
- this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER);
- this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
- Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR);
- this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor;
- if (producer.equals(entity) && targetSensorSpecified==null) {
- LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+
- producer+"."+sourceSensor+" (computing "+transformation+")");
- // we don't throw because this error may manifest itself after a lengthy deployment,
- // and failing it at that point simply because of an enricher is not very pleasant
- // (at least not until we have good re-run support across the board)
- return;
- }
-
- subscribe(producer, sourceSensor, this);
-
- if (sourceSensor instanceof AttributeSensor) {
- Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
- // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
- if (value!=null) {
- onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
- }
- }
- }
-
/** returns a function for transformation, for immediate use only (not for caching, as it may change) */
+ @Override
@SuppressWarnings("unchecked")
protected Function<SensorEvent<T>, U> getTransformation() {
MutableSet<Object> suppliers = MutableSet.of();
@@ -144,18 +99,5 @@ public class Transformer<T,U> extends AbstractEnricher implements SensorEventLis
}
};
}
-
- @Override
- public void onEvent(SensorEvent<T> event) {
- emit(targetSensor, compute(event));
- }
-
- protected Object compute(SensorEvent<T> event) {
- // transformation is not going to change, but this design makes it easier to support changing config in future.
- // if it's an efficiency hole we can switch to populate the transformation at start.
- U result = getTransformation().apply(event);
- if (LOG.isTraceEnabled())
- LOG.trace("Enricher "+this+" computed "+result+" from "+event);
- return result;
- }
+
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
new file mode 100644
index 0000000..64333d4
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricher.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+
+/**
+ * Transforms {@link Sensor} data into a rolling average based on a time window.
+ *
+ * All values within the window are weighted or discarded based on the timestamps associated with
+ * them (discards occur when a new value is added or an average is requested)
+ * <p>
+ * This will not extrapolate figures - it is assumed a value is valid and correct for the entire
+ * time period between it and the previous value. Normally, the average attribute is only updated
+ * when a new value arrives so it can give a fully informed average, but there is a danger of this
+ * going stale.
+ * <p>
+ * When an average is requested, it is likely there will be a segment of the window for which there
+ * isn't a value. Instead of extrapolating a value and providing different extrapolation techniques,
+ * the average is reported with a confidence value which reflects the fraction of the time
+ * window for which the values were valid.
+ * <p>
+ * Consumers of the average may ignore the confidence value and just use the last known average.
+ * They could multiply the returned value by the confidence value to get a decay-type behavior as
+ * the window empties. A third alternative is to, at a certain confidence threshold, report that
+ * the average is no longer meaningful.
+ * <p>
+ * The default average when no data has been received is 0, with a confidence of 0
+ */
+public class YamlRollingTimeWindowMeanEnricher<T extends Number> extends AbstractTransformer<T,Double> {
+
+ public static ConfigKey<Duration> WINDOW_DURATION = ConfigKeys.newConfigKey(Duration.class, "enricher.window.duration",
+ "Duration for which this window should store data, default one minute", Duration.ONE_MINUTE);
+
+ public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("enricher.window.confidenceRequired",
+ "Minimum confidence level (ie period covered) required to publish a rolling average", 0.8d);
+
+ public static class ConfidenceQualifiedNumber {
+ final Double value;
+ final double confidence;
+
+ public ConfidenceQualifiedNumber(Double value, double confidence) {
+ this.value = value;
+ this.confidence = confidence;
+ }
+
+ @Override
+ public String toString() {
+ return ""+value+" ("+(int)(confidence*100)+"%)";
+ }
+
+ }
+
+ private final LinkedList<T> values = new LinkedList<T>();
+ private final LinkedList<Long> timestamps = new LinkedList<Long>();
+ volatile ConfidenceQualifiedNumber lastAverage = new ConfidenceQualifiedNumber(0d,0d);
+
+ @Override
+ protected Function<SensorEvent<T>, Double> getTransformation() {
+ return new Function<SensorEvent<T>, Double>() {
+ @Override
+ public Double apply(SensorEvent<T> event) {
+ long eventTime = event.getTimestamp();
+ if (event.getValue()==null) {
+ return null;
+ }
+ values.addLast(event.getValue());
+ timestamps.addLast(eventTime);
+ if (eventTime>0) {
+ ConfidenceQualifiedNumber average = getAverage(eventTime, 0);
+
+ if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) {
+ // without confidence, we might publish wildly varying estimates,
+ // causing spurious resizes, so allow it to be configured, and
+ // by default require a high value
+
+ // TODO would be nice to include timestamp, etc
+ return average.value;
+ }
+ }
+ return null;
+ }
+ };
+ }
+
+ public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) {
+ if (timestamps.isEmpty()) {
+ return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d);
+ }
+
+ long firstTimestamp = -1;
+ Iterator<Long> ti = timestamps.iterator();
+ while (ti.hasNext()) {
+ firstTimestamp = ti.next();
+ if (firstTimestamp>0) break;
+ }
+ if (firstTimestamp<=0) {
+ // no values with reasonable timestamps
+ return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(), 0.0d);
+ }
+
+ long lastTimestamp = timestamps.get(timestamps.size()-1);
+
+ long now = fromTime;
+ if (lastTimestamp > fromTime - graceAllowed) {
+ // without this, if the computation takes place X seconds after the publish,
+ // we treat X seconds as time for which we have no confidence in the data
+ now = lastTimestamp;
+ }
+ pruneValues(now);
+
+ Duration timePeriod = getConfig(WINDOW_DURATION);
+ long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp);
+ long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp);
+ Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds();
+ if (confidence <= 0.0000001d) {
+ // not enough timestamps in window
+ double lastValue = values.get(values.size()-1).doubleValue();
+ return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d);
+ }
+
+ long start = windowStart;
+ long end;
+ double weightedAverage = 0.0d;
+
+ Iterator<T> valuesIter = values.iterator();
+ Iterator<Long> timestampsIter = timestamps.iterator();
+ while (valuesIter.hasNext()) {
+ // Ignores null and out-of-date values (and also values that are received out-of-order, but that shouldn't happen!)
+ Number val = valuesIter.next();
+ Long timestamp = timestampsIter.next();
+ if (val!=null && timestamp >= start) {
+ end = timestamp;
+ weightedAverage += ((end - start) / (confidence * timePeriod.toMilliseconds())) * val.doubleValue();
+ start = timestamp;
+ }
+ }
+
+ return lastAverage = new ConfidenceQualifiedNumber(weightedAverage, confidence);
+ }
+
+ /**
+ * Discards out-of-date values, but keeps at least one value.
+ */
+ private void pruneValues(long now) {
+ // keep one value from before the period, so that we can tell the window's start time
+ Duration timePeriod = getConfig(WINDOW_DURATION);
+ while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds())) {
+ timestamps.removeFirst();
+ values.removeFirst();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
new file mode 100644
index 0000000..b515da4
--- /dev/null
+++ b/core/src/main/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.enricher.basic.AbstractTransformer;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.SensorEvent;
+import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+
+/**
+ * Converts an absolute count sensor into a delta sensor (i.e. the diff between the current and previous value),
+ * presented as a units/timeUnit based on the event timing.
+ * <p>
+ * For example, given a requests.count sensor, this can make a requests.per_sec sensor with {@link #DELTA_PERIOD} set to "1s" (the default).
+ * <p>
+ * Suitable for configuration from YAML.
+ */
+public class YamlTimeWeightedDeltaEnricher<T extends Number> extends AbstractTransformer<T,Double> {
+ private static final Logger LOG = LoggerFactory.getLogger(YamlTimeWeightedDeltaEnricher.class);
+
+ Number lastValue;
+ long lastTime = -1;
+
+ public static ConfigKey<Duration> DELTA_PERIOD = ConfigKeys.newConfigKey(Duration.class, "enricher.delta.period",
+ "Duration that this delta should compute for, default per second", Duration.ONE_SECOND);
+
+ @Override
+ protected Function<SensorEvent<T>, Double> getTransformation() {
+ return new Function<SensorEvent<T>, Double>() {
+ @Override
+ public Double apply(SensorEvent<T> event) {
+ Number current = TypeCoercions.coerce(event.getValue(), Double.class);
+
+ if (current == null) return null;
+
+ long eventTime = event.getTimestamp();
+ long unitMillis = getConfig(DELTA_PERIOD).toMilliseconds();
+ Double result = null;
+
+ if (eventTime > 0 && eventTime > lastTime) {
+ if (lastValue == null || lastTime < 0) {
+ // cannot calculate time-based delta with a single value
+ if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last value so will not emit, null -> {} at {}", new Object[] {this, current, eventTime});
+ } else {
+ double duration = eventTime - lastTime;
+ result = (current.doubleValue() - lastValue.doubleValue()) / (duration / unitMillis);
+ }
+ }
+
+ lastValue = current;
+ lastTime = eventTime;
+
+ return result;
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
new file mode 100644
index 0000000..45b7ec3
--- /dev/null
+++ b/core/src/test/java/brooklyn/enricher/basic/YamlRollingTimeWindowMeanEnricherTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.enricher.basic.YamlRollingTimeWindowMeanEnricher.ConfidenceQualifiedNumber;
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.BasicEntity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.management.SubscriptionContext;
+import brooklyn.policy.EnricherSpec;
+import brooklyn.util.time.Duration;
+
+public class YamlRollingTimeWindowMeanEnricherTest {
+
+ AbstractApplication app;
+
+ BasicEntity producer;
+
+ AttributeSensor<Integer> intSensor;
+ AttributeSensor<Double> avgSensor, deltaSensor;
+
+ Duration timePeriod = Duration.ONE_SECOND;
+
+ YamlTimeWeightedDeltaEnricher<Double> delta;
+ YamlRollingTimeWindowMeanEnricher<Double> averager;
+
+ ConfidenceQualifiedNumber average;
+ SubscriptionContext subscription;
+
+ @SuppressWarnings("unchecked")
+ @BeforeMethod
+ public void before() {
+ app = new AbstractApplication() {};
+ Entities.startManagement(app);
+ producer = app.addChild(EntitySpec.create(BasicEntity.class));
+
+ intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor");
+ deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor");
+ avgSensor = new BasicAttributeSensor<Double>(Double.class, "avg sensor");
+
+ delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+ .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+ .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+ .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+
+ averager = producer.addEnricher(EnricherSpec.create(YamlRollingTimeWindowMeanEnricher.class)
+ .configure(YamlRollingTimeWindowMeanEnricher.PRODUCER, producer)
+ .configure(YamlRollingTimeWindowMeanEnricher.SOURCE_SENSOR, deltaSensor)
+ .configure(YamlRollingTimeWindowMeanEnricher.TARGET_SENSOR, avgSensor)
+ .configure(YamlRollingTimeWindowMeanEnricher.WINDOW_DURATION, timePeriod));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ @Test
+ public void testDefaultAverageWhenEmpty() {
+ ConfidenceQualifiedNumber average = averager.getAverage(0, 0);
+ assertEquals(average.value, 0d);
+ assertEquals(average.confidence, 0.0d);
+ }
+
+ protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp) {
+ return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp);
+ }
+ protected BasicSensorEvent<Double> newDeltaSensorEvent(double value, long timestamp) {
+ return new BasicSensorEvent<Double>(deltaSensor, producer, value, timestamp);
+ }
+
+ @Test
+ public void testNoRecentValuesAverage() {
+ averager.onEvent(newDeltaSensorEvent(10, 0));
+ average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0);
+ assertEquals(average.value, 10d);
+ assertEquals(average.confidence, 0d);
+ }
+
+ @Test
+ public void testNoRecentValuesUsesLastForAverage() {
+ averager.onEvent(newDeltaSensorEvent(10, 0));
+ averager.onEvent(newDeltaSensorEvent(20, 10));
+ average = averager.getAverage(timePeriod.toMilliseconds()+1000, 0);
+ assertEquals(average.value, 20d);
+ assertEquals(average.confidence, 0d);
+ }
+
+ @Test
+ public void testSingleValueTimeAverage() {
+ averager.onEvent(newDeltaSensorEvent(10, 1000));
+ average = averager.getAverage(1000, 0);
+ assertEquals(average.confidence, 0d);
+ }
+
+ @Test
+ public void testTwoValueAverageForPeriod() {
+ averager.onEvent(newDeltaSensorEvent(10, 1000));
+ averager.onEvent(newDeltaSensorEvent(10, 2000));
+ average = averager.getAverage(2000, 0);
+ assertEquals(average.value, 10 /1d);
+ assertEquals(average.confidence, 1d);
+ }
+
+ @Test
+ public void testMonospacedAverage() {
+ averager.onEvent(newDeltaSensorEvent(10, 1000));
+ averager.onEvent(newDeltaSensorEvent(20, 1250));
+ averager.onEvent(newDeltaSensorEvent(30, 1500));
+ averager.onEvent(newDeltaSensorEvent(40, 1750));
+ averager.onEvent(newDeltaSensorEvent(50, 2000));
+ average = averager.getAverage(2000, 0);
+ assertEquals(average.value, (20+30+40+50)/4d);
+ assertEquals(average.confidence, 1d);
+ }
+
+ @Test
+ public void testWeightedAverage() {
+ averager.onEvent(newDeltaSensorEvent(10, 1000));
+ averager.onEvent(newDeltaSensorEvent(20, 1100));
+ averager.onEvent(newDeltaSensorEvent(30, 1300));
+ averager.onEvent(newDeltaSensorEvent(40, 1600));
+ averager.onEvent(newDeltaSensorEvent(50, 2000));
+
+ average = averager.getAverage(2000, 0);
+ assertEquals(average.value, (20*0.1d)+(30*0.2d)+(40*0.3d)+(50*0.4d));
+ assertEquals(average.confidence, 1d);
+ }
+
+ @Test
+ public void testConfidenceDecay() {
+ averager.onEvent(newDeltaSensorEvent(10, 1000));
+ averager.onEvent(newDeltaSensorEvent(20, 1250));
+ averager.onEvent(newDeltaSensorEvent(30, 1500));
+ averager.onEvent(newDeltaSensorEvent(40, 1750));
+ averager.onEvent(newDeltaSensorEvent(50, 2000));
+
+ average = averager.getAverage(2250, 0);
+ assertEquals(average.value, (30+40+50)/3d);
+ assertEquals(average.confidence, 0.75d);
+ average = averager.getAverage(2500, 0);
+ assertEquals(average.value, (40+50)/2d);
+ assertEquals(average.confidence, 0.5d);
+ average = averager.getAverage(2750, 0);
+ assertEquals(average.value, 50d);
+ assertEquals(average.confidence, 0.25d);
+ average = averager.getAverage(3000, 0);
+ assertEquals(average.value, 50d);
+ assertEquals(average.confidence, 0d);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
new file mode 100644
index 0000000..2a7a974
--- /dev/null
+++ b/core/src/test/java/brooklyn/enricher/basic/YamlTimeWeightedDeltaEnricherTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.enricher.basic;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.BasicEntity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicSensorEvent;
+import brooklyn.management.SubscriptionContext;
+import brooklyn.policy.EnricherSpec;
+
+public class YamlTimeWeightedDeltaEnricherTest {
+
+ AbstractApplication app;
+
+ BasicEntity producer;
+
+ AttributeSensor<Integer> intSensor;
+ AttributeSensor<Double> avgSensor, deltaSensor;
+ SubscriptionContext subscription;
+
+ @BeforeMethod
+ public void before() {
+ app = new AbstractApplication() {};
+ Entities.startManagement(app);
+ producer = app.addChild(EntitySpec.create(BasicEntity.class));
+
+ intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor");
+ deltaSensor = new BasicAttributeSensor<Double>(Double.class, "delta sensor");
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ @Test
+ public void testMonospaceTimeWeightedDeltaEnricher() {
+ @SuppressWarnings("unchecked")
+ YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+ .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+ .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+ .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+
+ delta.onEvent(newIntSensorEvent(0, 0));
+ assertEquals(producer.getAttribute(deltaSensor), null);
+ delta.onEvent(newIntSensorEvent(0, 1000));
+ assertEquals(producer.getAttribute(deltaSensor), 0d);
+ delta.onEvent(newIntSensorEvent(1, 2000));
+ assertEquals(producer.getAttribute(deltaSensor), 1d);
+ delta.onEvent(newIntSensorEvent(3, 3000));
+ assertEquals(producer.getAttribute(deltaSensor), 2d);
+ delta.onEvent(newIntSensorEvent(8, 4000));
+ assertEquals(producer.getAttribute(deltaSensor), 5d);
+ }
+
+ protected BasicSensorEvent<Integer> newIntSensorEvent(int value, long timestamp) {
+ return new BasicSensorEvent<Integer>(intSensor, producer, value, timestamp);
+ }
+
+ @Test
+ public void testVariableTimeWeightedDeltaEnricher() {
+ @SuppressWarnings("unchecked")
+ YamlTimeWeightedDeltaEnricher<Integer> delta = producer.addEnricher(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class)
+ .configure(YamlTimeWeightedDeltaEnricher.PRODUCER, producer)
+ .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, intSensor)
+ .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, deltaSensor));
+
+ delta.onEvent(newIntSensorEvent(0, 0));
+ delta.onEvent(newIntSensorEvent(0, 2000));
+ assertEquals(producer.getAttribute(deltaSensor), 0d);
+ delta.onEvent(newIntSensorEvent(3, 5000));
+ assertEquals(producer.getAttribute(deltaSensor), 1d);
+ delta.onEvent(newIntSensorEvent(7, 7000));
+ assertEquals(producer.getAttribute(deltaSensor), 2d);
+ delta.onEvent(newIntSensorEvent(12, 7500));
+ assertEquals(producer.getAttribute(deltaSensor), 10d);
+ delta.onEvent(newIntSensorEvent(15, 9500));
+ assertEquals(producer.getAttribute(deltaSensor), 1.5d);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
index 694977c..7c55a81 100644
--- a/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/RollingTimeWindowMeanEnricher.java
@@ -128,12 +128,12 @@ public class RollingTimeWindowMeanEnricher<T extends Number> extends AbstractTyp
}
}
- @Deprecated /** @deprecatedsince 0.7.0; not used; use the 2-arg method */
+ @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg method */
public ConfidenceQualifiedNumber getAverage() {
return getAverage(System.currentTimeMillis(), 0);
}
- @Deprecated /** @deprecated since 0.7.0; not used; use the 2-arg method */
+ @Deprecated /** @deprecated since 0.7.0; not used except in groovy tests; use the 2-arg method */
public ConfidenceQualifiedNumber getAverage(long fromTimeExact) {
return getAverage(fromTimeExact, 0);
}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/1e69cc97/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
index b746edd..42e418f 100644
--- a/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
+++ b/policy/src/main/java/brooklyn/enricher/TimeWeightedDeltaEnricher.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brooklyn.enricher.basic.AbstractTypeTransformingEnricher;
+import brooklyn.enricher.basic.YamlTimeWeightedDeltaEnricher;
import brooklyn.entity.Entity;
import brooklyn.event.AttributeSensor;
import brooklyn.event.Sensor;
@@ -41,6 +42,8 @@ import com.google.common.base.Functions;
* presented as a units/timeUnit based on the event timing.
* <p>
* NB for time (e.g. "total milliseconds consumed") use {@link TimeFractionDeltaEnricher}
+ * <p>
+ * See also {@link YamlTimeWeightedDeltaEnricher} designed for use from YAML.
*/
//@Catalog(name="Time-weighted Delta", description="Converts an absolute sensor into a delta sensor "
// + "(i.e. the diff between the current and previous value), presented as a units/timeUnit "