You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:23 UTC
[35/36] incubator-brooklyn git commit: Rename o.a.b.sensor.enricher
to o.a.b.core.enricher
Rename o.a.b.sensor.enricher to o.a.b.core.enricher
- and o.a.b.enricher.stock for actual enricher impls
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6f15e8a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6f15e8a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6f15e8a6
Branch: refs/heads/master
Commit: 6f15e8a6d61c2e648547cf7faba03fbc06716421
Parents: daf4091
Author: Aled Sage <al...@gmail.com>
Authored: Wed Aug 19 23:07:28 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Aug 19 23:07:28 2015 +0100
----------------------------------------------------------------------
.../core/enricher/AbstractEnricher.java | 115 +++
.../core/enricher/EnricherDynamicType.java | 43 +
.../core/enricher/EnricherTypeSnapshot.java | 39 +
.../brooklyn/core/entity/AbstractEntity.java | 2 +-
.../entity/lifecycle/ServiceStateLogic.java | 8 +-
.../mgmt/rebind/BasicEnricherRebindSupport.java | 2 +-
.../mgmt/rebind/BasicEntityRebindSupport.java | 2 +-
.../core/mgmt/rebind/RebindIteration.java | 2 +-
.../core/mgmt/rebind/RebindManagerImpl.java | 2 +-
.../mgmt/rebind/dto/MementosGenerators.java | 2 +-
.../core/objs/AbstractEntityAdjunct.java | 2 +-
.../brooklyn/core/objs/BrooklynTypes.java | 2 +-
.../core/objs/proxy/InternalPolicyFactory.java | 2 +-
.../brooklyn/core/sensor/StaticSensor.java | 2 +-
.../stock/AbstractAggregatingEnricher.java | 174 ++++
.../enricher/stock/AbstractAggregator.java | 238 ++++++
.../stock/AbstractMultipleSensorAggregator.java | 169 ++++
.../enricher/stock/AbstractTransformer.java | 101 +++
.../stock/AbstractTransformingEnricher.java | 38 +
.../stock/AbstractTypeTransformingEnricher.java | 68 ++
.../brooklyn/enricher/stock/AddingEnricher.java | 107 +++
.../brooklyn/enricher/stock/Aggregator.java | 222 +++++
.../brooklyn/enricher/stock/Combiner.java | 138 ++++
.../stock/CustomAggregatingEnricher.java | 320 +++++++
.../brooklyn/enricher/stock/Enrichers.java | 825 +++++++++++++++++++
.../apache/brooklyn/enricher/stock/Joiner.java | 127 +++
.../brooklyn/enricher/stock/Propagator.java | 201 +++++
.../stock/SensorPropagatingEnricher.java | 181 ++++
.../stock/SensorTransformingEnricher.java | 106 +++
.../brooklyn/enricher/stock/Transformer.java | 103 +++
.../brooklyn/enricher/stock/UpdatingMap.java | 159 ++++
.../YamlRollingTimeWindowMeanEnricher.java | 178 ++++
.../stock/YamlTimeWeightedDeltaEnricher.java | 83 ++
.../entity/group/DynamicFabricImpl.java | 2 +-
.../entity/stock/DelegateEntityImpl.java | 2 +-
.../enricher/AbstractAggregatingEnricher.java | 173 ----
.../sensor/enricher/AbstractAggregator.java | 237 ------
.../sensor/enricher/AbstractEnricher.java | 115 ---
.../AbstractMultipleSensorAggregator.java | 169 ----
.../sensor/enricher/AbstractTransformer.java | 100 ---
.../enricher/AbstractTransformingEnricher.java | 38 -
.../AbstractTypeTransformingEnricher.java | 67 --
.../sensor/enricher/AddingEnricher.java | 106 ---
.../brooklyn/sensor/enricher/Aggregator.java | 221 -----
.../brooklyn/sensor/enricher/Combiner.java | 137 ---
.../enricher/CustomAggregatingEnricher.java | 320 -------
.../sensor/enricher/EnricherDynamicType.java | 43 -
.../sensor/enricher/EnricherTypeSnapshot.java | 39 -
.../brooklyn/sensor/enricher/Enrichers.java | 824 ------------------
.../apache/brooklyn/sensor/enricher/Joiner.java | 126 ---
.../brooklyn/sensor/enricher/Propagator.java | 200 -----
.../enricher/SensorPropagatingEnricher.java | 180 ----
.../enricher/SensorTransformingEnricher.java | 106 ---
.../brooklyn/sensor/enricher/Transformer.java | 103 ---
.../brooklyn/sensor/enricher/UpdatingMap.java | 158 ----
.../YamlRollingTimeWindowMeanEnricher.java | 178 ----
.../enricher/YamlTimeWeightedDeltaEnricher.java | 83 --
.../core/enricher/BasicEnricherTest.java | 119 +++
.../core/enricher/EnricherConfigTest.java | 147 ++++
.../brooklyn/core/entity/EntitySpecTest.java | 2 +-
.../BrooklynMementoPersisterTestFixture.java | 2 +-
.../core/mgmt/rebind/RebindEnricherTest.java | 4 +-
.../core/mgmt/rebind/RebindFailuresTest.java | 2 +-
.../core/mgmt/rebind/RebindPolicyTest.java | 2 +-
.../core/policy/basic/EnricherTypeTest.java | 2 +-
.../brooklyn/core/test/policy/TestEnricher.java | 2 +-
...stomAggregatingEnricherDeprecatedTest.groovy | 368 +++++++++
.../stock/CustomAggregatingEnricherTest.java | 556 +++++++++++++
.../brooklyn/enricher/stock/EnrichersTest.java | 501 +++++++++++
...SensorPropagatingEnricherDeprecatedTest.java | 108 +++
.../stock/SensorPropagatingEnricherTest.java | 218 +++++
.../TransformingEnricherDeprecatedTest.groovy | 83 ++
.../stock/TransformingEnricherTest.java | 71 ++
.../YamlRollingTimeWindowMeanEnricherTest.java | 179 ++++
.../YamlTimeWeightedDeltaEnricherTest.java | 107 +++
.../sensor/enricher/BasicEnricherTest.java | 119 ---
...stomAggregatingEnricherDeprecatedTest.groovy | 367 ---------
.../enricher/CustomAggregatingEnricherTest.java | 556 -------------
.../sensor/enricher/EnricherConfigTest.java | 147 ----
.../brooklyn/sensor/enricher/EnrichersTest.java | 501 -----------
...SensorPropagatingEnricherDeprecatedTest.java | 108 ---
.../enricher/SensorPropagatingEnricherTest.java | 218 -----
.../TransformingEnricherDeprecatedTest.groovy | 82 --
.../enricher/TransformingEnricherTest.java | 71 --
.../YamlRollingTimeWindowMeanEnricherTest.java | 179 ----
.../YamlTimeWeightedDeltaEnricherTest.java | 107 ---
.../brooklyn/demo/ResilientMongoDbApp.java | 2 +-
.../demo/WebClusterDatabaseExample.java | 2 +-
.../demo/WebClusterDatabaseExampleApp.java | 2 +-
...lusterDatabaseExampleAppIntegrationTest.java | 2 +-
.../brooklyn/policy/enricher/DeltaEnricher.java | 2 +-
.../policy/enricher/HttpLatencyDetector.java | 2 +-
.../policy/enricher/RollingMeanEnricher.java | 2 +-
.../enricher/RollingTimeWindowMeanEnricher.java | 4 +-
.../enricher/TimeFractionDeltaEnricher.java | 2 +-
.../enricher/TimeWeightedDeltaEnricher.java | 4 +-
.../brooklynnode/BrooklynClusterImpl.java | 2 +-
.../entity/brooklynnode/BrooklynNodeImpl.java | 2 +-
.../software/base/SoftwareProcessImpl.java | 2 +-
.../system_service/SystemServiceEnricher.java | 2 +-
.../entity/database/crate/CrateNodeImpl.java | 2 +-
.../entity/database/mysql/MySqlClusterImpl.java | 2 +-
.../messaging/kafka/KafkaClusterImpl.java | 2 +-
.../messaging/storm/StormDeploymentImpl.java | 2 +-
.../bind/BindDnsServerIntegrationTest.java | 2 +-
.../network/bind/PrefixAndIdEnricher.java | 2 +-
.../cassandra/CassandraDatacenterImpl.java | 2 +-
.../nosql/couchbase/CouchbaseClusterImpl.java | 2 +-
.../nosql/mongodb/MongoDBReplicaSetImpl.java | 2 +-
.../sharding/CoLocatedMongoDBRouterImpl.java | 2 +-
.../sharding/MongoDBShardedDeploymentImpl.java | 2 +-
.../entity/nosql/redis/RedisClusterImpl.java | 2 +-
.../entity/nosql/riak/RiakClusterImpl.java | 2 +-
.../entity/nosql/riak/RiakNodeImpl.java | 2 +-
.../entity/proxy/nginx/NginxControllerImpl.java | 2 +-
.../ControlledDynamicWebAppClusterImpl.java | 2 +-
.../entity/webapp/DynamicWebAppClusterImpl.java | 2 +-
.../entity/webapp/DynamicWebAppFabricImpl.java | 2 +-
.../entity/webapp/jboss/JBoss6ServerImpl.java | 2 +-
.../entity/webapp/jboss/JBoss7ServerImpl.java | 2 +-
.../entity/webapp/jetty/Jetty6ServerImpl.java | 2 +-
.../app/ClusterWebServerDatabaseSample.java | 4 +-
.../camp/brooklyn/EnrichersYamlTest.java | 2 +-
.../camp/brooklyn/TestReferencingEnricher.java | 2 +-
...est-app-with-enrichers-slightly-simpler.yaml | 12 +-
.../test-webapp-with-averaging-enricher.yaml | 4 +-
.../apache/brooklyn/cli/lister/ClassFinder.java | 2 +-
.../qa/load/SimulatedJBoss7ServerImpl.java | 2 +-
.../brooklyn/qa/load/SimulatedTheeTierApp.java | 2 +-
.../webcluster/SinusoidalLoadGenerator.java | 2 +-
.../qa/longevity/webcluster/WebClusterApp.java | 2 +-
.../rest/util/BrooklynRestResourceUtils.java | 2 +-
132 files changed, 6271 insertions(+), 6257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
new file mode 100644
index 0000000..0dc36f6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/enricher/AbstractEnricher.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.enricher;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.EnricherMemento;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.api.sensor.EnricherType;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.rebind.BasicEnricherRebindSupport;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+
+/**
+* Base {@link Enricher} implementation; all enrichers should extend this or its children
+*/
+public abstract class AbstractEnricher extends AbstractEntityAdjunct implements Enricher {
+
+ public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey("enricher.suppressDuplicates",
+ "Whether duplicate values published by this enricher should be suppressed");
+
+ private final EnricherDynamicType enricherType;
+ protected Boolean suppressDuplicates;
+
+ public AbstractEnricher() {
+ this(Maps.newLinkedHashMap());
+ }
+
+ public AbstractEnricher(Map<?,?> flags) {
+ super(flags);
+
+ enricherType = new EnricherDynamicType(this);
+
+ if (isLegacyConstruction() && !isLegacyNoConstructionInit()) {
+ init();
+ }
+ }
+
+ @Override
+ public RebindSupport<EnricherMemento> getRebindSupport() {
+ return new BasicEnricherRebindSupport(this);
+ }
+
+ @Override
+ public EnricherType getEnricherType() {
+ return enricherType.getSnapshot();
+ }
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ Boolean suppressDuplicates = getConfig(SUPPRESS_DUPLICATES);
+ if (suppressDuplicates!=null)
+ this.suppressDuplicates = suppressDuplicates;
+ }
+
+ @Override
+ protected void onChanged() {
+ requestPersist();
+ }
+
+ @Override
+ protected <T> void emit(Sensor<T> sensor, Object val) {
+ checkState(entity != null, "entity must first be set");
+ if (val == Entities.UNCHANGED) {
+ return;
+ }
+ if (val == Entities.REMOVE) {
+ ((EntityInternal)entity).removeAttribute((AttributeSensor<T>) sensor);
+ return;
+ }
+
+ T newVal = TypeCoercions.coerce(val, sensor.getTypeToken());
+ if (sensor instanceof AttributeSensor) {
+ if (Boolean.TRUE.equals(suppressDuplicates)) {
+ T oldValue = entity.getAttribute((AttributeSensor<T>)sensor);
+ if (Objects.equal(oldValue, newVal))
+ return;
+ }
+ entity.setAttribute((AttributeSensor<T>)sensor, newVal);
+ } else {
+ entity.emit(sensor, newVal);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java
new file mode 100644
index 0000000..b6a0b23
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherDynamicType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.enricher;
+
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.api.sensor.EnricherType;
+import org.apache.brooklyn.core.objs.BrooklynDynamicType;
+
+public class EnricherDynamicType extends BrooklynDynamicType<Enricher, AbstractEnricher> {
+
+ public EnricherDynamicType(Class<? extends Enricher> type) {
+ super(type);
+ }
+
+ public EnricherDynamicType(AbstractEnricher enricher) {
+ super(enricher);
+ }
+
+ public EnricherType getSnapshot() {
+ return (EnricherType) super.getSnapshot();
+ }
+
+ @Override
+ protected EnricherTypeSnapshot newSnapshot() {
+ return new EnricherTypeSnapshot(name, value(configKeys));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java
new file mode 100644
index 0000000..240d884
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/enricher/EnricherTypeSnapshot.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.enricher;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.sensor.EnricherType;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.objs.BrooklynTypeSnapshot;
+
+public class EnricherTypeSnapshot extends BrooklynTypeSnapshot implements EnricherType {
+ private static final long serialVersionUID = 4670930188951106009L;
+
+ EnricherTypeSnapshot(String name, Map<String, ConfigKey<?>> configKeys) {
+ super(name, configKeys);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ return (obj instanceof EnricherTypeSnapshot) && super.equals(obj);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 0ec5903..fb8f2d0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -59,6 +59,7 @@ import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.BrooklynLogging;
import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
import org.apache.brooklyn.core.config.render.RendererHints;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.internal.EntityConfigMap;
import org.apache.brooklyn.core.entity.lifecycle.PolicyDescriptor;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
@@ -83,7 +84,6 @@ import org.apache.brooklyn.core.sensor.AttributeMap;
import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.BasicNotificationSensor;
import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
index 654662f..c2606c1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
@@ -42,16 +42,16 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.BrooklynLogging;
import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAdjuncts;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.entity.EntityPredicates;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.enricher.AbstractMultipleSensorAggregator;
-import org.apache.brooklyn.sensor.enricher.Enrichers;
-import org.apache.brooklyn.sensor.enricher.UpdatingMap;
+import org.apache.brooklyn.enricher.stock.AbstractMultipleSensorAggregator;
+import org.apache.brooklyn.enricher.stock.Enrichers;
+import org.apache.brooklyn.enricher.stock.UpdatingMap;
import org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java
index 89e11e2..3903655 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEnricherRebindSupport.java
@@ -20,7 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind;
import org.apache.brooklyn.api.mgmt.rebind.RebindContext;
import org.apache.brooklyn.api.mgmt.rebind.mementos.EnricherMemento;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
index 2a5e92e..0d80698 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
@@ -32,13 +32,13 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.EntityMemento;
import org.apache.brooklyn.api.objs.BrooklynObjectType;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.entity.group.AbstractGroupImpl;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index e9478ef..3f468ba 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -64,6 +64,7 @@ import org.apache.brooklyn.core.BrooklynLogging;
import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel;
import org.apache.brooklyn.core.catalog.internal.CatalogInitialization;
import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.AbstractApplication;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.EntityInternal;
@@ -86,7 +87,6 @@ import org.apache.brooklyn.core.objs.proxy.InternalFactory;
import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory;
import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory;
import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index fdce617..52b984a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -46,6 +46,7 @@ import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
@@ -55,7 +56,6 @@ import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode;
import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer;
import org.apache.brooklyn.core.server.BrooklynServerConfig;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.QuorumCheck;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
index 929b63c..36daf49 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
@@ -49,6 +49,7 @@ import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.catalog.internal.CatalogItemDo;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.EntityDynamicType;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.feed.AbstractFeed;
@@ -58,7 +59,6 @@ import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport;
import org.apache.brooklyn.core.mgmt.rebind.TreeUtils;
import org.apache.brooklyn.core.objs.BrooklynTypes;
import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
index efd89d1..e85cc73 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java
@@ -44,10 +44,10 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.config.ConfigMap;
import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.internal.SubscriptionTracker;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java
index b6e68ff..4170613 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/BrooklynTypes.java
@@ -27,9 +27,9 @@ import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.sensor.Enricher;
import org.apache.brooklyn.api.sensor.Sensor;
import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.enricher.EnricherDynamicType;
import org.apache.brooklyn.core.entity.EntityDynamicType;
import org.apache.brooklyn.core.policy.PolicyDynamicType;
-import org.apache.brooklyn.sensor.enricher.EnricherDynamicType;
import org.apache.brooklyn.util.exceptions.Exceptions;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java
index 4e45580..aaee778 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalPolicyFactory.java
@@ -27,10 +27,10 @@ import org.apache.brooklyn.api.sensor.Enricher;
import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.core.policy.AbstractPolicy;
-import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.exceptions.Exceptions;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java
index 4a7b1d4..b017315 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/StaticSensor.java
@@ -23,7 +23,7 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.effector.AddSensor;
-import org.apache.brooklyn.sensor.enricher.Propagator;
+import org.apache.brooklyn.enricher.stock.Propagator;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.task.ValueResolver;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java
new file mode 100644
index 0000000..2d25a75
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregatingEnricher.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.entity.trait.Changeable;
+import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+
+
+/**
+ * AggregatingEnrichers implicitly subscribes to the same sensor<S> on all entities inside an
+ * {@link Group} and should emit an aggregate<T> on the target sensor
+ *
+ * @deprecated since 0.7.0; use {@link Enrichers.builder()}
+ * @see Aggregator if need to sub-class
+ */
+public abstract class AbstractAggregatingEnricher<S,T> extends AbstractEnricher implements SensorEventListener<S> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregatingEnricher.class);
+
+ AttributeSensor<? extends S> source;
+ protected AttributeSensor<T> target;
+ protected S defaultValue;
+
+ Set<Entity> producers;
+ List<Entity> hardCodedProducers;
+ boolean allMembers;
+ Predicate<Entity> filter;
+
+ /**
+ * Users of values should either on it synchronize when iterating over its entries or use
+ * copyOfValues to obtain an immutable copy of the map.
+ */
+ // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values.
+ protected final Map<Entity, S> values = Collections.synchronizedMap(new LinkedHashMap<Entity, S>());
+
+ public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target) {
+ this(flags, source, target, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, S defaultValue) {
+ super(flags);
+ this.source = source;
+ this.target = target;
+ this.defaultValue = defaultValue;
+ hardCodedProducers = (List<Entity>) (flags.containsKey("producers") ? flags.get("producers") : Collections.emptyList());
+ allMembers = (Boolean) (flags.containsKey("allMembers") ? flags.get("allMembers") : false);
+ filter = flags.containsKey("filter") ? GroovyJavaMethods.<Entity>castToPredicate(flags.get("filter")) : Predicates.<Entity>alwaysTrue();
+ }
+
+ public void addProducer(Entity producer) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} linked ({}, {}) to {}", new Object[] {this, producer, source, target});
+ subscribe(producer, source, this);
+ synchronized (values) {
+ S vo = values.get(producer);
+ if (vo==null) {
+ S initialVal = ((EntityLocal)producer).getAttribute(source);
+ values.put(producer, initialVal != null ? initialVal : defaultValue);
+ //we might skip in onEvent in the short window while !values.containsKey(producer)
+ //but that's okay because the put which would have been done there is done here now
+ } else {
+ //vo will be null unless some weird race with addProducer+removeProducer is occuring
+ //(and that's something we can tolerate i think)
+ if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer});
+ }
+ }
+ onUpdated();
+ }
+
+ // TODO If producer removed but then get (queued) event from it after this method returns,
+ public S removeProducer(Entity producer) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} unlinked ({}, {}) from {}", new Object[] {this, producer, source, target});
+ unsubscribe(producer);
+ S removed = values.remove(producer);
+ onUpdated();
+ return removed;
+ }
+
+ @Override
+ public void onEvent(SensorEvent<S> event) {
+ Entity e = event.getSource();
+ synchronized (values) {
+ if (values.containsKey(e)) {
+ values.put(e, event.getValue());
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e);
+ }
+ }
+ onUpdated();
+ }
+
+ /**
+ * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed).
+ * Defaults to no-op
+ */
+ // TODO should this be abstract?
+ protected void onUpdated() {
+ // no-op
+ }
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ for (Entity producer : hardCodedProducers) {
+ if (filter.apply(producer)) {
+ addProducer(producer);
+ }
+ }
+
+ if (allMembers) {
+ subscribe(entity, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> it) {
+ if (filter.apply(it.getValue())) addProducer(it.getValue());
+ }
+ });
+ subscribe(entity, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> it) {
+ removeProducer(it.getValue());
+ }
+ });
+
+ if (entity instanceof Group) {
+ for (Entity member : ((Group)entity).getMembers()) {
+ if (filter.apply(member)) {
+ addProducer(member);
+ }
+ }
+ }
+ }
+ }
+
+ protected Map<Entity, S> copyOfValues() {
+ synchronized (values) {
+ return ImmutableMap.copyOf(values);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
new file mode 100644
index 0000000..926b769
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractAggregator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.entity.trait.Changeable;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.reflect.TypeToken;
+
+/** Abstract superclass for enrichers which aggregate from children and/or members */
+@SuppressWarnings("serial")
+public abstract class AbstractAggregator<T,U> extends AbstractEnricher implements SensorEventListener<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class);
+
+ public static final ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer", "The entity whose children/members will be aggregated");
+
+ public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+
+ // FIXME this is not just for "members" i think -Alex
+ public static final ConfigKey<?> DEFAULT_MEMBER_VALUE = ConfigKeys.newConfigKey(Object.class, "enricher.defaultMemberValue");
+
+ public static final ConfigKey<Set<? extends Entity>> FROM_HARDCODED_PRODUCERS = ConfigKeys.newConfigKey(new TypeToken<Set<? extends Entity>>() {}, "enricher.aggregating.fromHardcodedProducers");
+
+ public static final ConfigKey<Boolean> FROM_MEMBERS = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromMembers",
+ "Whether this enricher looks at members; only supported if a Group producer is supplier; defaults to true for Group entities");
+
+ public static final ConfigKey<Boolean> FROM_CHILDREN = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromChildren",
+ "Whether this enricher looks at children; this is the default for non-Group producers");
+
+ public static final ConfigKey<Predicate<? super Entity>> ENTITY_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<? super Entity>>() {}, "enricher.aggregating.entityFilter");
+
+ public static final ConfigKey<Predicate<?>> VALUE_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<?>>() {}, "enricher.aggregating.valueFilter");
+
+ protected Entity producer;
+ protected Sensor<U> targetSensor;
+ protected T defaultMemberValue;
+ protected Set<? extends Entity> fromHardcodedProducers;
+ protected Boolean fromMembers;
+ protected Boolean fromChildren;
+ protected Predicate<? super Entity> entityFilter;
+ protected Predicate<? super T> valueFilter;
+
+ public AbstractAggregator() {}
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ setEntityLoadingConfig();
+
+ if (fromHardcodedProducers == null && producer == null) producer = entity;
+ checkState(fromHardcodedProducers != null ^ producer != null, "must specify one of %s (%s) or %s (%s)",
+ PRODUCER.getName(), producer, FROM_HARDCODED_PRODUCERS.getName(), fromHardcodedProducers);
+
+ if (fromHardcodedProducers != null) {
+ for (Entity producer : Iterables.filter(fromHardcodedProducers, entityFilter)) {
+ addProducerHardcoded(producer);
+ }
+ }
+
+ if (isAggregatingMembers()) {
+ setEntityBeforeSubscribingProducerMemberEvents(entity);
+ setEntitySubscribeProducerMemberEvents();
+ setEntityAfterSubscribingProducerMemberEvents();
+ }
+
+ if (isAggregatingChildren()) {
+ setEntityBeforeSubscribingProducerChildrenEvents();
+ setEntitySubscribingProducerChildrenEvents();
+ setEntityAfterSubscribingProducerChildrenEvents();
+ }
+
+ onUpdated();
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ protected void setEntityLoadingConfig() {
+ this.producer = getConfig(PRODUCER);
+ this.fromHardcodedProducers= getConfig(FROM_HARDCODED_PRODUCERS);
+ this.defaultMemberValue = (T) getConfig(DEFAULT_MEMBER_VALUE);
+ this.fromMembers = Maybe.fromNullable(getConfig(FROM_MEMBERS)).or(fromMembers);
+ this.fromChildren = Maybe.fromNullable(getConfig(FROM_CHILDREN)).or(fromChildren);
+ this.entityFilter = (Predicate<? super Entity>) (getConfig(ENTITY_FILTER) == null ? Predicates.alwaysTrue() : getConfig(ENTITY_FILTER));
+ this.valueFilter = (Predicate<? super T>) (getConfig(VALUE_FILTER) == null ? getDefaultValueFilter() : getConfig(VALUE_FILTER));
+
+ setEntityLoadingTargetConfig();
+ }
+
+ protected Predicate<?> getDefaultValueFilter() {
+ return Predicates.alwaysTrue();
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ protected void setEntityLoadingTargetConfig() {
+ this.targetSensor = (Sensor<U>) getRequiredConfig(TARGET_SENSOR);
+ }
+
+ protected void setEntityBeforeSubscribingProducerMemberEvents(EntityLocal entity) {
+ checkState(producer instanceof Group, "Producer must be a group when fromMembers true: producer=%s; entity=%s; "
+ + "hardcodedProducers=%s", getConfig(PRODUCER), entity, fromHardcodedProducers);
+ }
+
+ protected void setEntitySubscribeProducerMemberEvents() {
+ subscribe(producer, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ if (entityFilter.apply(event.getValue())) {
+ addProducerMember(event.getValue());
+ onUpdated();
+ }
+ }
+ });
+ subscribe(producer, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ removeProducer(event.getValue());
+ onUpdated();
+ }
+ });
+ }
+
+ protected void setEntityAfterSubscribingProducerMemberEvents() {
+ if (producer instanceof Group) {
+ for (Entity member : Iterables.filter(((Group)producer).getMembers(), entityFilter)) {
+ addProducerMember(member);
+ }
+ }
+ }
+
+ protected void setEntityBeforeSubscribingProducerChildrenEvents() {
+ }
+
+ protected void setEntitySubscribingProducerChildrenEvents() {
+ subscribe(producer, AbstractEntity.CHILD_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ removeProducer(event.getValue());
+ onUpdated();
+ }
+ });
+ subscribe(producer, AbstractEntity.CHILD_ADDED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ if (entityFilter.apply(event.getValue())) {
+ addProducerChild(event.getValue());
+ onUpdated();
+ }
+ }
+ });
+ }
+
+ protected void setEntityAfterSubscribingProducerChildrenEvents() {
+ for (Entity child : Iterables.filter(producer.getChildren(), entityFilter)) {
+ addProducerChild(child);
+ }
+ }
+
+ /** true if this should aggregate members */
+ protected boolean isAggregatingMembers() {
+ if (Boolean.TRUE.equals(fromMembers)) return true;
+ if (Boolean.TRUE.equals(fromChildren)) return false;
+ if (fromHardcodedProducers!=null) return false;
+ if (producer instanceof Group) return true;
+ return false;
+ }
+
+ /** true if this should aggregate members */
+ protected boolean isAggregatingChildren() {
+ if (Boolean.TRUE.equals(fromChildren)) return true;
+ if (Boolean.TRUE.equals(fromMembers)) return false;
+ if (fromHardcodedProducers!=null) return false;
+ if (producer instanceof Group) return false;
+ return true;
+ }
+
+ protected abstract void addProducerHardcoded(Entity producer);
+ protected abstract void addProducerMember(Entity producer);
+ protected abstract void addProducerChild(Entity producer);
+
+ // TODO If producer removed but then get (queued) event from it after this method returns,
+ protected void removeProducer(Entity producer) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} stopped listening to {}", new Object[] {this, producer });
+ unsubscribe(producer);
+ onProducerRemoved(producer);
+ }
+
+ protected abstract void onProducerAdded(Entity producer);
+
+ protected abstract void onProducerRemoved(Entity producer);
+
+
+ /**
+ * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed).
+ */
+ protected void onUpdated() {
+ try {
+ emit(targetSensor, compute());
+ } catch (Throwable t) {
+ LOG.warn("Error calculating and setting aggregate for enricher "+this, t);
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ protected abstract Object compute();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
new file mode 100644
index 0000000..1d76168
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractMultipleSensorAggregator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.BrooklynLogging;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */
+public abstract class AbstractMultipleSensorAggregator<U> extends AbstractAggregator<Object,U> implements SensorEventListener<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipleSensorAggregator.class);
+
+
+ /** access via {@link #getValues(Sensor)} */
+ private final Map<String, Map<Entity,Object>> values = Collections.synchronizedMap(new LinkedHashMap<String, Map<Entity,Object>>());
+
+ public AbstractMultipleSensorAggregator() {}
+
+ protected abstract Collection<Sensor<?>> getSourceSensors();
+
+ @Override
+ protected void setEntityLoadingConfig() {
+ super.setEntityLoadingConfig();
+ Preconditions.checkNotNull(getSourceSensors(), "sourceSensors must be set");
+ }
+
+ @Override
+ protected void setEntityBeforeSubscribingProducerChildrenEvents() {
+ BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer),
+ "{} subscribing to children of {}", this, producer);
+ for (Sensor<?> sourceSensor: getSourceSensors()) {
+ subscribeToChildren(producer, sourceSensor, this);
+ }
+ }
+
+ @Override
+ protected void addProducerHardcoded(Entity producer) {
+ for (Sensor<?> sourceSensor: getSourceSensors()) {
+ subscribe(producer, sourceSensor, this);
+ }
+ onProducerAdded(producer);
+ }
+
+ @Override
+ protected void addProducerChild(Entity producer) {
+ // no `subscribe` call needed here, due to previous subscribeToChildren call
+ onProducerAdded(producer);
+ }
+
+ @Override
+ protected void addProducerMember(Entity producer) {
+ addProducerHardcoded(producer);
+ }
+
+ @Override
+ protected void onProducerAdded(Entity producer) {
+ BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer),
+ "{} listening to {}", this, producer);
+ synchronized (values) {
+ for (Sensor<?> sensor: getSourceSensors()) {
+ Map<Entity,Object> vs = values.get(sensor.getName());
+ if (vs==null) {
+ vs = new LinkedHashMap<Entity,Object>();
+ values.put(sensor.getName(), vs);
+ }
+
+ Object vo = vs.get(producer);
+ if (vo==null) {
+ Object initialVal;
+ if (sensor instanceof AttributeSensor) {
+ initialVal = producer.getAttribute((AttributeSensor<?>)sensor);
+ } else {
+ initialVal = null;
+ }
+ vs.put(producer, initialVal != null ? initialVal : defaultMemberValue);
+ // NB: see notes on possible race, in Aggregator#onProducerAdded
+ }
+
+ }
+ }
+ }
+
+ @Override
+ protected void onProducerRemoved(Entity producer) {
+ synchronized (values) {
+ for (Sensor<?> sensor: getSourceSensors()) {
+ Map<Entity,Object> vs = values.get(sensor.getName());
+ if (vs!=null)
+ vs.remove(producer);
+ }
+ }
+ onUpdated();
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ Entity e = event.getSource();
+ synchronized (values) {
+ Map<Entity,Object> vs = values.get(event.getSensor().getName());
+ if (vs==null) {
+ LOG.debug(this+" received event when no entry for sensor ("+event+"); likely just added or removed, and will initialize subsequently if needed");
+ } else {
+ vs.put(e, event.getValue());
+ }
+ }
+ onUpdated();
+ }
+
+ public <T> Map<Entity,T> getValues(Sensor<T> sensor) {
+ Map<Entity, T> valuesCopy = copyValues(sensor);
+ return coerceValues(valuesCopy, sensor.getType());
+ }
+
+ private <T> Map<Entity, T> coerceValues(Map<Entity, T> values, Class<? super T> type) {
+ Map<Entity, T> typedValues = MutableMap.of();
+ for (Entry<Entity, T> entry : values.entrySet()) {
+ @SuppressWarnings("unchecked")
+ T typedValue = (T) TypeCoercions.coerce(entry.getValue(), type);
+ typedValues.put(entry.getKey(), typedValue);
+ }
+ return typedValues;
+ }
+
+ private <T> Map<Entity, T> copyValues(Sensor<T> sensor) {
+ synchronized (values) {
+ @SuppressWarnings("unchecked")
+ Map<Entity, T> sv = (Map<Entity, T>) values.get(sensor.getName());
+ //use MutableMap because of potentially null values
+ return MutableMap.copyOf(sv).asUnmodifiable();
+ }
+ }
+
+ @Override
+ protected abstract Object compute();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
new file mode 100644
index 0000000..ab41c1a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+@SuppressWarnings("serial")
+public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements SensorEventListener<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class);
+
+ public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
+
+ public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
+
+ public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+
+ protected Entity producer;
+ protected Sensor<T> sourceSensor;
+ protected Sensor<U> targetSensor;
+
+ public AbstractTransformer() {
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ Function<SensorEvent<T>, U> transformation = getTransformation();
+ this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER);
+ this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
+ Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR);
+ this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor;
+ if (producer.equals(entity) && targetSensorSpecified==null) {
+ LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+
+ producer+"."+sourceSensor+" (computing "+transformation+")");
+ // we don't throw because this error may manifest itself after a lengthy deployment,
+ // and failing it at that point simply because of an enricher is not very pleasant
+ // (at least not until we have good re-run support across the board)
+ return;
+ }
+
+ subscribe(producer, sourceSensor, this);
+
+ if (sourceSensor instanceof AttributeSensor) {
+ Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor);
+ // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce"
+ if (value!=null) {
+ onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1));
+ }
+ }
+ }
+
+ /** returns a function for transformation, for immediate use only (not for caching, as it may change) */
+ protected abstract Function<SensorEvent<T>, U> getTransformation();
+
+ @Override
+ public void onEvent(SensorEvent<T> event) {
+ emit(targetSensor, compute(event));
+ }
+
+ protected Object compute(SensorEvent<T> event) {
+ // transformation is not going to change, but this design makes it easier to support changing config in future.
+ // if it's an efficiency hole we can switch to populate the transformation at start.
+ U result = getTransformation().apply(event);
+ if (LOG.isTraceEnabled())
+ LOG.trace("Enricher "+this+" computed "+result+" from "+event);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java
new file mode 100644
index 0000000..a29cc7b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTransformingEnricher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Sensor;
+
+/**
+ * Convenience base for transforming a single sensor into a single new sensor of the same type
+ *
+ * @deprecated since 0.7.0; use {@link Enrichers.builder()}
+ */
+public abstract class AbstractTransformingEnricher<T> extends AbstractTypeTransformingEnricher<T,T> {
+
+ public AbstractTransformingEnricher() { // for rebinding
+ }
+
+ public AbstractTransformingEnricher(Entity producer, Sensor<T> source, Sensor<T> target) {
+ super(producer, source, target);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
new file mode 100644
index 0000000..1469829
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AbstractTypeTransformingEnricher.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+import org.apache.brooklyn.util.core.flags.SetFromFlag;
+
+/**
+ * Convenience base for transforming a single sensor into a single new sensor of the same type
+ *
+ * @deprecated since 0.7.0; use {@link Enrichers.builder()}
+ */
+public abstract class AbstractTypeTransformingEnricher<T,U> extends AbstractEnricher implements SensorEventListener<T> {
+
+ @SetFromFlag
+ private Entity producer;
+
+ @SetFromFlag
+ private Sensor<T> source;
+
+ @SetFromFlag
+ protected Sensor<U> target;
+
+ public AbstractTypeTransformingEnricher() { // for rebind
+ }
+
+ public AbstractTypeTransformingEnricher(Entity producer, Sensor<T> source, Sensor<U> target) {
+ this.producer = producer;
+ this.source = source;
+ this.target = target;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ if (producer==null) producer = entity;
+ subscribe(producer, source, this);
+
+ if (source instanceof AttributeSensor) {
+ Object value = producer.getAttribute((AttributeSensor)source);
+ // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex)
+ if (value!=null)
+ onEvent(new BasicSensorEvent(source, producer, value, -1));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java
new file mode 100644
index 0000000..941d745
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/AddingEnricher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
+
+/**
+ * enricher which adds multiple sensors on an entity to produce a new sensor
+ *
+ * Instead, consider calling:
+ * <pre>
+ * {@code
+ * addEnricher(Enrichers.builder()
+ * .combining(sources)
+ * .publishing(target)
+ * .computeSum()
+ * .build());
+ * }
+ * </pre>
+ * <p>
+ *
+ * @deprecated since 0.7.0; use {@link Enrichers.builder()}
+ * @see Combiner if need to sub-class
+ */
+public class AddingEnricher extends AbstractEnricher implements SensorEventListener {
+
+ private Sensor[] sources;
+ private Sensor<? extends Number> target;
+
+ public AddingEnricher(Sensor sources[], Sensor<? extends Number> target) {
+ this.sources = sources;
+ this.target = target;
+ }
+
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ for (Sensor source: sources) {
+ subscribe(entity, source, this);
+ if (source instanceof AttributeSensor) {
+ Object value = entity.getAttribute((AttributeSensor)source);
+ if (value!=null)
+ onEvent(new BasicSensorEvent(source, entity, value, -1));
+ }
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void onEvent(SensorEvent event) {
+ Number value = recompute();
+ Number typedValue = cast(value, (Class<? extends Number>)target.getType());
+ if (target instanceof AttributeSensor) {
+ entity.setAttribute((AttributeSensor)target, typedValue);
+ } else if (typedValue!=null)
+ entity.emit((Sensor)target, typedValue);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <V> V cast(Number value, Class<V> type) {
+ if (value==null) return null;
+ if (type.isInstance(value)) return (V)value;
+
+ if (type==Integer.class) return (V) (Integer) (int)Math.round(value.doubleValue());
+ if (type==Long.class) return (V) (Long) Math.round(value.doubleValue());
+ if (type==Double.class) return (V) (Double) value.doubleValue();
+ if (type==Float.class) return (V) (Float) value.floatValue();
+ if (type==Byte.class) return (V) (Byte) (byte)Math.round(value.doubleValue());
+ if (type==Short.class) return (V) (Short) (short)Math.round(value.doubleValue());
+
+ throw new UnsupportedOperationException("conversion of mathematical operation to "+type+" not supported");
+ }
+
+ protected Number recompute() {
+ if (sources.length==0) return null;
+ Double result = 0d;
+ for (Sensor source: sources) {
+ Object value = entity.getAttribute((AttributeSensor) source);
+ if (value==null) return null;
+ result += ((Number)value).doubleValue();
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
new file mode 100644
index 0000000..e42d2cb
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Aggregator.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.BrooklynLogging;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.enricher.stock.Enrichers.ComputingAverage;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.flags.SetFromFlag;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.StringPredicates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.reflect.TypeToken;
+
+/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */
+@SuppressWarnings("serial")
+//@Catalog(name="Aggregator", description="Aggregates attributes from multiple entities into a single attribute value; see Enrichers.builder().aggregating(...)")
+public class Aggregator<T,U> extends AbstractAggregator<T,U> implements SensorEventListener<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class);
+
+ public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor");
+
+ @SetFromFlag("transformation")
+ public static final ConfigKey<Object> TRANSFORMATION_UNTYPED = ConfigKeys.newConfigKey(Object.class, "enricher.transformation.untyped",
+ "Specifies a transformation, as a function from a collection to the value, or as a string matching a pre-defined named transformation, "
+ + "such as 'average' (for numbers), 'sum' (for numbers), or 'list' (the default, putting any collection of items into a list)");
+ public static final ConfigKey<Function<? super Collection<?>, ?>> TRANSFORMATION = ConfigKeys.newConfigKey(new TypeToken<Function<? super Collection<?>, ?>>() {}, "enricher.transformation");
+
+ public static final ConfigKey<Boolean> EXCLUDE_BLANK = ConfigKeys.newBooleanConfigKey("enricher.aggregator.excludeBlank", "Whether explicit nulls or blank strings should be excluded (default false); this only applies if no value filter set", false);
+
+ protected Sensor<T> sourceSensor;
+ protected Function<? super Collection<T>, ? extends U> transformation;
+
+ /**
+ * Users of values should either on it synchronize when iterating over its entries or use
+ * copyOfValues to obtain an immutable copy of the map.
+ */
+ // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values.
+ protected final Map<Entity, T> values = Collections.synchronizedMap(new LinkedHashMap<Entity, T>());
+
+ public Aggregator() {}
+
+ @SuppressWarnings("unchecked")
+ protected void setEntityLoadingConfig() {
+ super.setEntityLoadingConfig();
+ this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR);
+
+ this.transformation = (Function<? super Collection<T>, ? extends U>) config().get(TRANSFORMATION);
+
+ Object t1 = config().get(TRANSFORMATION_UNTYPED);
+ Function<? super Collection<?>, ?> t2 = null;
+ if (t1 instanceof String) {
+ t2 = lookupTransformation((String)t1);
+ if (t2==null) {
+ LOG.warn("Unknown transformation '"+t1+"' for "+this+"; will use default transformation");
+ }
+ }
+
+ if (this.transformation==null) {
+ this.transformation = (Function<? super Collection<T>, ? extends U>) t2;
+ } else if (t1!=null && !Objects.equals(t2, this.transformation)) {
+ throw new IllegalStateException("Cannot supply both "+TRANSFORMATION_UNTYPED+" and "+TRANSFORMATION+" unless they are equal.");
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected Function<? super Collection<?>, ?> lookupTransformation(String t1) {
+ if ("average".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken());
+ if ("sum".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken());
+ if ("list".equalsIgnoreCase(t1)) return new ComputingList();
+ return null;
+ }
+
+ private class ComputingList<TT> implements Function<Collection<TT>, List<TT>> {
+ @Override
+ public List<TT> apply(Collection<TT> input) {
+ if (input==null) return null;
+ return MutableList.copyOf(input).asUnmodifiable();
+ }
+
+ }
+
+ @Override
+ protected void setEntityBeforeSubscribingProducerChildrenEvents() {
+ BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer),
+ "{} subscribing to children of {}", this, producer);
+ subscribeToChildren(producer, sourceSensor, this);
+ }
+
+ @Override
+ protected void addProducerHardcoded(Entity producer) {
+ subscribe(producer, sourceSensor, this);
+ onProducerAdded(producer);
+ }
+
+ @Override
+ protected void addProducerChild(Entity producer) {
+ // no subscription needed here, due to the subscribeToChildren call
+ onProducerAdded(producer);
+ }
+
+ @Override
+ protected void addProducerMember(Entity producer) {
+ subscribe(producer, sourceSensor, this);
+ onProducerAdded(producer);
+ }
+
+ @Override
+ protected void onProducerAdded(Entity producer) {
+ BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer),
+ "{} listening to {}", this, producer);
+ synchronized (values) {
+ T vo = values.get(producer);
+ if (vo==null) {
+ T initialVal;
+ if (sourceSensor instanceof AttributeSensor) {
+ initialVal = producer.getAttribute((AttributeSensor<T>)sourceSensor);
+ } else {
+ initialVal = null;
+ }
+ values.put(producer, initialVal != null ? initialVal : defaultMemberValue);
+ //we might skip in onEvent in the short window while !values.containsKey(producer)
+ //but that's okay because the put which would have been done there is done here now
+ } else {
+ //vo will be null unless some weird race with addProducer+removeProducer is occuring
+ //(and that's something we can tolerate i think)
+ if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer});
+ }
+ }
+ }
+
+ @Override
+ protected Predicate<?> getDefaultValueFilter() {
+ if (getConfig(EXCLUDE_BLANK))
+ return StringPredicates.isNonBlank();
+ else
+ return Predicates.alwaysTrue();
+ }
+
+ @Override
+ protected void onProducerRemoved(Entity producer) {
+ values.remove(producer);
+ onUpdated();
+ }
+
+ @Override
+ public void onEvent(SensorEvent<T> event) {
+ Entity e = event.getSource();
+ synchronized (values) {
+ if (values.containsKey(e)) {
+ values.put(e, event.getValue());
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e);
+ }
+ }
+ onUpdated();
+ }
+
+ protected void onUpdated() {
+ try {
+ emit(targetSensor, compute());
+ } catch (Throwable t) {
+ LOG.warn("Error calculating and setting aggregate for enricher "+this, t);
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ @Override
+ protected Object compute() {
+ synchronized (values) {
+ // TODO Could avoid copying when filter not needed
+ List<T> vs = MutableList.copyOf(Iterables.filter(values.values(), valueFilter));
+ if (transformation==null) return vs;
+ return transformation.apply(vs);
+ }
+ }
+
+ protected Map<Entity, T> copyOfValues() {
+ // Don't use ImmutableMap, as can contain null values
+ synchronized (values) {
+ return Collections.unmodifiableMap(MutableMap.copyOf(values));
+ }
+ }
+
+}