You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:15 UTC
[27/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to
o.a.b.feed and o.a.b.core.feed
Rename o.a.b.sensor.feed to o.a.b.feed and o.a.b.core.feed
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/daf40919
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/daf40919
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/daf40919
Branch: refs/heads/master
Commit: daf40919b6fa20f16b6cd7d9eb1ad9f79baaa8f4
Parents: 2a78e27
Author: Aled Sage <al...@gmail.com>
Authored: Wed Aug 19 22:56:13 2015 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Wed Aug 19 22:56:13 2015 +0100
----------------------------------------------------------------------
.../brooklyn/core/entity/AbstractEntity.java | 4 +-
.../apache/brooklyn/core/feed/AbstractFeed.java | 240 ++++++
.../core/feed/AttributePollHandler.java | 248 +++++++
.../brooklyn/core/feed/ConfigToAttributes.java | 59 ++
.../core/feed/DelegatingPollHandler.java | 96 +++
.../apache/brooklyn/core/feed/FeedConfig.java | 297 ++++++++
.../apache/brooklyn/core/feed/PollConfig.java | 85 +++
.../apache/brooklyn/core/feed/PollHandler.java | 38 +
.../org/apache/brooklyn/core/feed/Poller.java | 205 ++++++
.../mgmt/rebind/BasicEntityRebindSupport.java | 2 +-
.../mgmt/rebind/BasicFeedRebindSupport.java | 2 +-
.../core/mgmt/rebind/RebindIteration.java | 2 +-
.../mgmt/rebind/dto/MementosGenerators.java | 2 +-
.../sensor/AttributeSensorAndConfigKey.java | 2 +-
.../brooklyn/core/sensor/HttpRequestSensor.java | 6 +-
.../entity/group/DynamicMultiGroupImpl.java | 4 +-
.../brooklyn/entity/stock/DataEntityImpl.java | 4 +-
.../brooklyn/feed/function/FunctionFeed.java | 208 ++++++
.../feed/function/FunctionPollConfig.java | 111 +++
.../org/apache/brooklyn/feed/http/HttpFeed.java | 382 ++++++++++
.../brooklyn/feed/http/HttpPollConfig.java | 160 ++++
.../brooklyn/feed/http/HttpPollValue.java | 40 +
.../apache/brooklyn/feed/http/HttpPolls.java | 39 +
.../brooklyn/feed/http/HttpValueFunctions.java | 154 ++++
.../brooklyn/feed/http/JsonFunctions.java | 235 ++++++
.../apache/brooklyn/feed/shell/ShellFeed.java | 273 +++++++
.../brooklyn/feed/shell/ShellPollConfig.java | 125 ++++
.../org/apache/brooklyn/feed/ssh/SshFeed.java | 290 ++++++++
.../apache/brooklyn/feed/ssh/SshPollConfig.java | 142 ++++
.../apache/brooklyn/feed/ssh/SshPollValue.java | 60 ++
.../brooklyn/feed/ssh/SshValueFunctions.java | 73 ++
.../windows/WindowsPerformanceCounterFeed.java | 412 +++++++++++
.../WindowsPerformanceCounterPollConfig.java | 53 ++
.../brooklyn/sensor/feed/AbstractFeed.java | 240 ------
.../sensor/feed/AttributePollHandler.java | 248 -------
.../sensor/feed/ConfigToAttributes.java | 59 --
.../sensor/feed/DelegatingPollHandler.java | 96 ---
.../apache/brooklyn/sensor/feed/FeedConfig.java | 297 --------
.../apache/brooklyn/sensor/feed/PollConfig.java | 85 ---
.../brooklyn/sensor/feed/PollHandler.java | 38 -
.../org/apache/brooklyn/sensor/feed/Poller.java | 205 ------
.../sensor/feed/function/FunctionFeed.java | 208 ------
.../feed/function/FunctionPollConfig.java | 111 ---
.../brooklyn/sensor/feed/http/HttpFeed.java | 382 ----------
.../sensor/feed/http/HttpPollConfig.java | 160 ----
.../sensor/feed/http/HttpPollValue.java | 40 -
.../brooklyn/sensor/feed/http/HttpPolls.java | 39 -
.../sensor/feed/http/HttpValueFunctions.java | 154 ----
.../sensor/feed/http/JsonFunctions.java | 235 ------
.../brooklyn/sensor/feed/shell/ShellFeed.java | 273 -------
.../sensor/feed/shell/ShellPollConfig.java | 125 ----
.../brooklyn/sensor/feed/ssh/SshFeed.java | 290 --------
.../brooklyn/sensor/feed/ssh/SshPollConfig.java | 142 ----
.../brooklyn/sensor/feed/ssh/SshPollValue.java | 60 --
.../sensor/feed/ssh/SshValueFunctions.java | 73 --
.../windows/WindowsPerformanceCounterFeed.java | 412 -----------
.../WindowsPerformanceCounterPollConfig.java | 53 --
.../util/core/http/HttpToolResponse.java | 2 +-
.../core/feed/ConfigToAttributesTest.java | 70 ++
.../apache/brooklyn/core/feed/PollerTest.java | 108 +++
.../core/location/TestPortSupplierLocation.java | 2 +-
.../core/mgmt/rebind/RebindFeedTest.java | 16 +-
.../feed/function/FunctionFeedTest.java | 315 ++++++++
.../feed/http/HttpFeedIntegrationTest.java | 160 ++++
.../apache/brooklyn/feed/http/HttpFeedTest.java | 392 ++++++++++
.../feed/http/HttpValueFunctionsTest.java | 94 +++
.../brooklyn/feed/http/JsonFunctionsTest.java | 130 ++++
.../feed/shell/ShellFeedIntegrationTest.java | 226 ++++++
.../feed/ssh/SshFeedIntegrationTest.java | 264 +++++++
.../WindowsPerformanceCounterFeedLiveTest.java | 104 +++
.../WindowsPerformanceCounterFeedTest.java | 132 ++++
.../sensor/feed/ConfigToAttributesTest.java | 70 --
.../apache/brooklyn/sensor/feed/PollerTest.java | 108 ---
.../sensor/feed/function/FunctionFeedTest.java | 315 --------
.../feed/http/HttpFeedIntegrationTest.java | 160 ----
.../brooklyn/sensor/feed/http/HttpFeedTest.java | 392 ----------
.../feed/http/HttpValueFunctionsTest.java | 94 ---
.../sensor/feed/http/JsonFunctionsTest.java | 130 ----
.../feed/shell/ShellFeedIntegrationTest.java | 226 ------
.../sensor/feed/ssh/SshFeedIntegrationTest.java | 264 -------
.../WindowsPerformanceCounterFeedLiveTest.java | 104 ---
.../WindowsPerformanceCounterFeedTest.java | 132 ----
.../policy/enricher/HttpLatencyDetector.java | 6 +-
.../entity/database/derby/DerbyDatabase.java | 2 +-
.../entity/database/derby/DerbySchema.java | 6 +-
.../postgresql/PostgreSqlNodeSaltImpl.java | 4 +-
.../entity/salt/SaltStackMasterImpl.java | 3 +-
.../entity/monitoring/zabbix/ZabbixFeed.java | 10 +-
.../monitoring/zabbix/ZabbixPollConfig.java | 6 +-
.../monitoring/zabbix/ZabbixServerImpl.java | 6 +-
.../nosql/hazelcast/HazelcastNodeImpl.java | 6 +-
.../brooklynnode/BrooklynClusterImpl.java | 4 +-
.../brooklynnode/BrooklynEntityMirrorImpl.java | 4 +-
.../entity/brooklynnode/BrooklynNodeImpl.java | 10 +-
.../SetHighAvailabilityModeEffectorBody.java | 4 +-
.../brooklyn/entity/chef/ChefAttributeFeed.java | 8 +-
.../entity/chef/ChefAttributePollConfig.java | 2 +-
.../brooklyn/entity/java/JavaAppUtils.java | 6 +-
.../entity/java/JmxAttributeSensor.java | 6 +-
.../apache/brooklyn/entity/java/JmxSupport.java | 2 +-
.../entity/java/VanillaJavaAppImpl.java | 2 +-
.../entity/machine/MachineEntityImpl.java | 6 +-
.../base/AbstractSoftwareProcessSshDriver.java | 2 +-
.../software/base/SoftwareProcessImpl.java | 4 +-
.../MachineLifecycleEffectorTasks.java | 2 +-
.../feed/jmx/JmxAttributePollConfig.java | 74 ++
.../org/apache/brooklyn/feed/jmx/JmxFeed.java | 423 +++++++++++
.../org/apache/brooklyn/feed/jmx/JmxHelper.java | 724 +++++++++++++++++++
.../feed/jmx/JmxNotificationFilters.java | 64 ++
.../jmx/JmxNotificationSubscriptionConfig.java | 95 +++
.../feed/jmx/JmxOperationPollConfig.java | 121 ++++
.../brooklyn/feed/jmx/JmxValueFunctions.java | 95 +++
.../sensor/feed/jmx/JmxAttributePollConfig.java | 74 --
.../brooklyn/sensor/feed/jmx/JmxFeed.java | 423 -----------
.../brooklyn/sensor/feed/jmx/JmxHelper.java | 724 -------------------
.../sensor/feed/jmx/JmxNotificationFilters.java | 64 --
.../jmx/JmxNotificationSubscriptionConfig.java | 95 ---
.../sensor/feed/jmx/JmxOperationPollConfig.java | 121 ----
.../sensor/feed/jmx/JmxValueFunctions.java | 95 ---
.../brooklyn/sensor/ssh/SshCommandSensor.java | 6 +-
.../winrm/WindowsPerformanceCounterSensors.java | 2 +-
.../BrooklynNodeIntegrationTest.java | 2 +-
.../entity/brooklynnode/BrooklynNodeTest.java | 2 +-
.../brooklynnode/SameBrooklynNodeImpl.java | 6 +-
.../brooklynnode/SelectMasterEffectorTest.java | 6 +-
.../mysql/ChefSoloDriverToyMySqlEntity.java | 4 +-
.../brooklyn/entity/java/EntityPollingTest.java | 4 +-
.../entity/java/VanillaJavaAppTest.java | 2 +-
.../base/lifecycle/ScriptHelperTest.java | 4 +-
.../software/base/test/jmx/JmxService.java | 2 +-
.../apache/brooklyn/feed/jmx/JmxFeedTest.java | 422 +++++++++++
.../apache/brooklyn/feed/jmx/JmxHelperTest.java | 311 ++++++++
.../brooklyn/feed/jmx/RebindJmxFeedTest.java | 148 ++++
.../brooklyn/sensor/feed/jmx/JmxFeedTest.java | 422 -----------
.../brooklyn/sensor/feed/jmx/JmxHelperTest.java | 311 --------
.../sensor/feed/jmx/RebindJmxFeedTest.java | 148 ----
.../entity/database/crate/CrateNodeImpl.java | 8 +-
.../database/mariadb/MariaDbNodeImpl.java | 6 +-
.../entity/database/mysql/MySqlClusterImpl.java | 4 +-
.../entity/database/mysql/MySqlNodeImpl.java | 6 +-
.../PostgreSqlNodeChefImplFromScratch.java | 4 +-
.../messaging/activemq/ActiveMQBrokerImpl.java | 4 +-
.../activemq/ActiveMQDestinationImpl.java | 4 +-
.../messaging/activemq/ActiveMQQueueImpl.java | 4 +-
.../entity/messaging/kafka/KafkaBrokerImpl.java | 6 +-
.../messaging/kafka/KafkaClusterImpl.java | 2 +-
.../entity/messaging/qpid/QpidBrokerImpl.java | 6 +-
.../messaging/qpid/QpidDestinationImpl.java | 4 +-
.../entity/messaging/qpid/QpidQueueImpl.java | 4 +-
.../entity/messaging/rabbit/RabbitQueue.java | 6 +-
.../entity/messaging/storm/StormImpl.java | 4 +-
.../entity/zookeeper/AbstractZooKeeperImpl.java | 6 +-
.../entity/monitoring/monit/MonitNodeImpl.java | 6 +-
.../nosql/cassandra/CassandraNodeImpl.java | 12 +-
.../nosql/couchbase/CouchbaseClusterImpl.java | 8 +-
.../nosql/couchbase/CouchbaseNodeImpl.java | 8 +-
.../nosql/couchbase/CouchbaseNodeSshDriver.java | 2 +-
.../couchbase/CouchbaseSyncGatewayImpl.java | 6 +-
.../entity/nosql/couchdb/CouchDBNodeImpl.java | 6 +-
.../elasticsearch/ElasticSearchNodeImpl.java | 8 +-
.../entity/nosql/mongodb/MongoDBServerImpl.java | 4 +-
.../mongodb/sharding/MongoDBRouterImpl.java | 4 +-
.../entity/nosql/redis/RedisStoreImpl.java | 8 +-
.../entity/nosql/riak/RiakNodeImpl.java | 6 +-
.../entity/nosql/solr/SolrServerImpl.java | 6 +-
.../ElasticSearchClusterIntegrationTest.java | 2 +-
.../ElasticSearchNodeIntegrationTest.java | 2 +-
.../entity/osgi/karaf/KarafContainerImpl.java | 10 +-
.../entity/proxy/AbstractControllerImpl.java | 2 +-
.../AbstractNonProvisionedControllerImpl.java | 2 +-
.../entity/proxy/nginx/NginxControllerImpl.java | 8 +-
.../ControlledDynamicWebAppClusterImpl.java | 2 +-
.../entity/webapp/jboss/JBoss6ServerImpl.java | 4 +-
.../entity/webapp/jboss/JBoss7ServerImpl.java | 6 +-
.../entity/webapp/jetty/Jetty6ServerImpl.java | 4 +-
.../webapp/nodejs/NodeJsWebAppServiceImpl.java | 8 +-
.../entity/webapp/tomcat/TomcatServerImpl.java | 4 +-
.../qa/load/SimulatedJBoss7ServerImpl.java | 10 +-
.../qa/load/SimulatedMySqlNodeImpl.java | 4 +-
.../qa/load/SimulatedNginxControllerImpl.java | 10 +-
.../brooklynnode/DeployBlueprintTest.java | 2 +-
181 files changed, 8717 insertions(+), 8718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index 994961c..0ec5903 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -63,6 +63,8 @@ import org.apache.brooklyn.core.entity.internal.EntityConfigMap;
import org.apache.brooklyn.core.entity.lifecycle.PolicyDescriptor;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
import org.apache.brooklyn.core.internal.BrooklynInitialization;
import org.apache.brooklyn.core.internal.storage.BrooklynStorage;
import org.apache.brooklyn.core.internal.storage.Reference;
@@ -82,8 +84,6 @@ import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
import org.apache.brooklyn.core.sensor.BasicNotificationSensor;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
new file mode 100644
index 0000000..a31b73e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport;
+import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Captures common fields and processes for sensor feeds.
+ * These generally poll or subscribe to get sensor values for an entity.
+ * They make it easy to poll over http, jmx, etc.
+ */
+public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class);
+
+ public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false);
+
+ private final Object pollerStateMutex = new Object();
+ private transient volatile Poller<?> poller;
+ private transient volatile boolean activated;
+ private transient volatile boolean suspended;
+
+ public AbstractFeed() {
+ }
+
+ /**
+ * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)}
+ */
+ @Deprecated
+ public AbstractFeed(EntityLocal entity) {
+ this(entity, false);
+ }
+
+ /**
+ * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)}
+ */
+ @Deprecated
+ public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) {
+ this.entity = checkNotNull(entity, "entity");
+ setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp);
+ }
+
+ // Ensure idempotent, as called in builders (in case not registered with entity), and also called
+ // when registering with entity
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) {
+ ((EntityInternal)entity).feeds().addFeed(this);
+ }
+ }
+
+ protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) {
+ if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag;
+ else this.uniqueTag = getDefaultUniqueTag(valsForDefault);
+ }
+
+ protected String getDefaultUniqueTag(Object ...valsForDefault) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(JavaClassNames.simpleClassName(this));
+ if (valsForDefault.length==0) {
+ sb.append("@");
+ sb.append(hashCode());
+ } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){
+ sb.append(Strings.toUniqueString(valsForDefault[0], 80));
+ } else {
+ sb.append("[");
+ boolean first = true;
+ for (Object x: valsForDefault) {
+ if (!first) sb.append(";");
+ else first = false;
+ sb.append(Strings.toUniqueString(x, 80));
+ }
+ sb.append("]");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void start() {
+ if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
+ if (activated) {
+ throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running",
+ this, entity));
+ }
+ if (poller != null) {
+ throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
+ }
+
+ poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP));
+ activated = true;
+ preStart();
+ synchronized (pollerStateMutex) {
+ // don't start poller if we are suspended
+ if (!suspended) {
+ poller.start();
+ }
+ }
+ }
+
+ @Override
+ public void suspend() {
+ synchronized (pollerStateMutex) {
+ if (activated && !suspended) {
+ poller.stop();
+ }
+ suspended = true;
+ }
+ }
+
+ @Override
+ public void resume() {
+ synchronized (pollerStateMutex) {
+ if (activated && suspended) {
+ poller.start();
+ }
+ suspended = false;
+ }
+ }
+
+ @Override
+ public void destroy() {
+ stop();
+ }
+
+ @Override
+ public void stop() {
+ if (!activated) {
+ log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity);
+ return;
+ }
+ if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity);
+
+ activated = false;
+ preStop();
+ synchronized (pollerStateMutex) {
+ if (!suspended) {
+ poller.stop();
+ }
+ }
+ postStop();
+ super.destroy();
+ }
+
+ @Override
+ public boolean isActivated() {
+ return activated;
+ }
+
+ public EntityLocal getEntity() {
+ return entity;
+ }
+
+ protected boolean isConnected() {
+ // TODO Default impl will result in multiple logs for same error if becomes unreachable
+ // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
+ // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
+ // Would be nice if reduced this logging duplication.
+ // (You can reduce it by providing a better 'isConnected' implementation of course.)
+ return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return suspended;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning();
+ }
+
+ @Override
+ public RebindSupport<FeedMemento> getRebindSupport() {
+ return new BasicFeedRebindSupport(this);
+ }
+
+ @Override
+ protected void onChanged() {
+ // TODO Auto-generated method stub
+ }
+
+ /**
+ * For overriding.
+ */
+ protected void preStart() {
+ }
+
+ /**
+ * For overriding.
+ */
+ protected void preStop() {
+ }
+
+ /**
+ * For overriding.
+ */
+ protected void postStop() {
+ }
+
+ /**
+ * For overriding, where sub-class can change return-type generics!
+ */
+ protected Poller<?> getPoller() {
+ return poller;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
new file mode 100644
index 0000000..c266836
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+
+/**
+ * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set.
+ *
+ * Calls to onSuccess and onError will happen sequentially, but may be called from different threads
+ * each time. Note that no guarantees of a synchronized block exist, so additional synchronization
+ * would be required for the Java memory model's "happens before" relationship.
+ *
+ * @author aled
+ */
+public class AttributePollHandler<V> implements PollHandler<V> {
+
+ public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class);
+
+ private final FeedConfig<V,?,?> config;
+ private final EntityLocal entity;
+ @SuppressWarnings("rawtypes")
+ private final AttributeSensor sensor;
+ private final AbstractFeed feed;
+ private final boolean suppressDuplicates;
+
+ // allow 30 seconds before logging at WARN, if there has been no success yet;
+ // after success WARN immediately
+ // TODO these should both be configurable
+ private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
+ private Duration logWarningGraceTime = Duration.millis(0);
+
+ // internal state to look after whether to log warnings
+ private volatile Long lastSuccessTime = null;
+ private volatile Long currentProblemStartTime = null;
+ private volatile boolean currentProblemLoggedAsWarning = false;
+ private volatile boolean lastWasProblem = false;
+
+
+ public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) {
+ this.config = checkNotNull(config, "config");
+ this.entity = checkNotNull(entity, "entity");
+ this.sensor = checkNotNull(config.getSensor(), "sensor");
+ this.feed = checkNotNull(feed, "feed");
+ this.suppressDuplicates = config.getSupressDuplicates();
+ }
+
+ @Override
+ public boolean checkSuccess(V val) {
+ // Always true if no checkSuccess predicate was configured.
+ return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val);
+ }
+
+ @Override
+ public void onSuccess(V val) {
+ if (lastWasProblem) {
+ if (currentProblemLoggedAsWarning) {
+ log.info("Success (following previous problem) reading "+getBriefDescription());
+ } else {
+ log.debug("Success (following previous problem) reading "+getBriefDescription());
+ }
+ lastWasProblem = false;
+ currentProblemStartTime = null;
+ currentProblemLoggedAsWarning = false;
+ }
+ lastSuccessTime = System.currentTimeMillis();
+ if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val});
+
+ try {
+ setSensor(transformValueOnSuccess(val));
+ } catch (Exception e) {
+ if (feed.isConnected()) {
+ log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e);
+ } else {
+ if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e);
+ }
+ }
+ }
+
+ /** allows post-processing, such as applying a success handler;
+ * default applies the onSuccess handler (which is recommended) */
+ protected Object transformValueOnSuccess(V val) {
+ return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val;
+ }
+
+ @Override
+ public void onFailure(V val) {
+ if (!config.hasFailureHandler()) {
+ onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler"));
+ } else {
+ logProblem("failure", val);
+
+ try {
+ setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val);
+ } catch (Exception e) {
+ if (feed.isConnected()) {
+ log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onException(Exception exception) {
+ if (!feed.isConnected()) {
+ if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception});
+ } else {
+ logProblem("exception", exception);
+ }
+
+ if (config.hasExceptionHandler()) {
+ try {
+ setSensor( config.getOnException().apply(exception) );
+ } catch (Exception e) {
+ if (feed.isConnected()) {
+ log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e);
+ } else {
+ if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e);
+ }
+ }
+ }
+ }
+
+ protected void logProblem(String type, Object val) {
+ if (lastWasProblem && currentProblemLoggedAsWarning) {
+ if (log.isTraceEnabled())
+ log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val});
+ } else {
+ long nowTime = System.currentTimeMillis();
+ // get a non-volatile value
+ Long currentProblemStartTimeCache = currentProblemStartTime;
+ long expiryTime =
+ (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() :
+ currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() :
+ nowTime+logWarningGraceTimeOnStartup.toMilliseconds();
+ if (!lastWasProblem) {
+ if (expiryTime <= nowTime) {
+ currentProblemLoggedAsWarning = true;
+ if (entity==null || !Entities.isNoLongerManaged(entity)) {
+ log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val);
+ } else {
+ log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val);
+ }
+ if (log.isDebugEnabled() && val instanceof Throwable)
+ log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val);
+ }
+ lastWasProblem = true;
+ currentProblemStartTime = nowTime;
+ } else {
+ if (expiryTime <= nowTime) {
+ currentProblemLoggedAsWarning = true;
+ log.warn("Read of " + getBriefDescription() + " gave " + type +
+ " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+
+ (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+
+ ")"+
+ ": " + val);
+ if (log.isDebugEnabled() && val instanceof Throwable)
+ log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val});
+ }
+ }
+ }
+ }
+
+ protected boolean isTransitioningOrStopped() {
+ if (entity==null) return false;
+ Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED);
+ if (expected==null) return false;
+ return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void setSensor(Object v) {
+ if (Entities.isNoLongerManaged(entity)) {
+ if (Tasks.isInterrupted()) return;
+ log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")");
+ }
+
+ if (v == FeedConfig.UNCHANGED) {
+ // nothing
+ } else if (v == FeedConfig.REMOVE) {
+ ((EntityInternal)entity).removeAttribute(sensor);
+ } else if (sensor == FeedConfig.NO_SENSOR) {
+ // nothing
+ } else {
+ Object coercedV = TypeCoercions.coerce(v, sensor.getType());
+ if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) {
+ // no change; nothing
+ } else {
+ entity.setAttribute(sensor, coercedV);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()+"["+getDescription()+"]";
+ }
+
+ @Override
+ public String getDescription() {
+ return sensor.getName()+" @ "+entity.getId()+" <- "+config;
+ }
+
+ protected String getBriefDescription() {
+ return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
new file mode 100644
index 0000000..d455e80
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/ConfigToAttributes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
+import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
+
+
+/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */
+public class ConfigToAttributes {
+
+ //normally just applied once, statically, not registered...
+ public static void apply(EntityLocal entity) {
+ for (Sensor<?> it : entity.getEntityType().getSensors()) {
+ if (it instanceof AttributeSensorAndConfigKey) {
+ apply(entity, (AttributeSensorAndConfigKey<?,?>)it);
+ }
+ }
+ }
+
+ /**
+ * Convenience for ensuring an individual sensor is set from its config key
+ * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!)
+ */
+ public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) {
+ T v = entity.getAttribute(key);
+ if (v!=null) return v;
+ v = key.getAsSensorValue(entity);
+ if (v!=null) entity.setAttribute(key, v);
+ return v;
+ }
+
+ /**
+ * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}),
+ * outside of the context of an entity.
+ */
+ public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) {
+ return key.getAsSensorValue(managementContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
new file mode 100644
index 0000000..fae7dd6
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/DelegatingPollHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A poll handler that delegates each call to a set of poll handlers.
+ *
+ * @author aled
+ */
+public class DelegatingPollHandler<V> implements PollHandler<V> {
+
+ private final List<AttributePollHandler<? super V>> delegates;
+
+ public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) {
+ super();
+ this.delegates = ImmutableList.copyOf(delegates);
+ }
+
+ @Override
+ public boolean checkSuccess(V val) {
+ for (AttributePollHandler<? super V> delegate : delegates) {
+ if (!delegate.checkSuccess(val))
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void onSuccess(V val) {
+ for (AttributePollHandler<? super V> delegate : delegates) {
+ delegate.onSuccess(val);
+ }
+ }
+
+ @Override
+ public void onFailure(V val) {
+ for (AttributePollHandler<? super V> delegate : delegates) {
+ delegate.onFailure(val);
+ }
+ }
+
+ @Override
+ public void onException(Exception exception) {
+ for (AttributePollHandler<? super V> delegate : delegates) {
+ delegate.onException(exception);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()+"["+getDescription()+"]";
+ }
+
+ @Override
+ public String getDescription() {
+ if (delegates.isEmpty())
+ return "(empty delegate list)";
+ if (delegates.size()==1)
+ return delegates.get(0).getDescription();
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ int count = 0;
+ for (AttributePollHandler<? super V> delegate : delegates) {
+ if (count>0) sb.append("; ");
+ sb.append(delegate.getDescription());
+ if (count>2) {
+ sb.append("; ...");
+ break;
+ }
+ count++;
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
new file mode 100644
index 0000000..4d06680
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.text.Strings;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+
+/**
+ * Configuration for a poll, or a subscription etc, that is being added to a feed.
+ *
+ * @author aled
+ */
+public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
+
+ /** The onSuccess or onError functions can return this value to indicate that the sensor should not change.
+ * @deprecated since 0.7.0 use UNCHANGED */
+ public static final Object UNSET = Entities.UNCHANGED;
+ /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */
+ public static final Object UNCHANGED = Entities.UNCHANGED;
+ /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed
+ * (cf 'null', but useful in dynamic situations) */
+ public static final Object REMOVE = Entities.REMOVE;
+
+ /** Indicates that no sensor is being used here. This sensor is suppressed,
+ * but is useful where you want to use the feeds with custom success/exception/failure functions
+ * which directly set multiple sensors, e.g. dynamically based on the poll response.
+ * <p>
+ * See {@link HttpPollConfig#forMultiple()} and its usages.
+ * (It can work for any poll config, but conveniences have not been supplied for others.) */
+ public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor");
+
+ private final AttributeSensor<T> sensor;
+ private Function<? super V, T> onsuccess;
+ private Function<? super V, T> onfailure;
+ private Function<? super Exception, T> onexception;
+ private Predicate<? super V> checkSuccess;
+ private boolean suppressDuplicates;
+ private boolean enabled = true;
+
+ public FeedConfig(AttributeSensor<T> sensor) {
+ this.sensor = checkNotNull(sensor, "sensor");
+ }
+
+ public FeedConfig(FeedConfig<V, T, F> other) {
+ this.sensor = other.sensor;
+ this.onsuccess = other.onsuccess;
+ this.onfailure = other.onfailure;
+ this.onexception = other.onexception;
+ this.checkSuccess = other.checkSuccess;
+ this.suppressDuplicates = other.suppressDuplicates;
+ this.enabled = other.enabled;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected F self() {
+ return (F) this;
+ }
+
+ public AttributeSensor<T> getSensor() {
+ return sensor;
+ }
+
+ public Predicate<? super V> getCheckSuccess() {
+ return checkSuccess;
+ }
+
+ public Function<? super V, T> getOnSuccess() {
+ return onsuccess;
+ }
+
+ public Function<? super V, T> getOnFailure() {
+ return onfailure;
+ }
+
+ public Function<? super Exception, T> getOnException() {
+ return onexception;
+ }
+
+ public boolean getSupressDuplicates() {
+ return suppressDuplicates;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ /** sets the predicate used to check whether a feed run is successful */
+ public F checkSuccess(Predicate<? super V> val) {
+ this.checkSuccess = checkNotNull(val, "checkSuccess");
+ return self();
+ }
+ /** as {@link #checkSuccess(Predicate)} */
+ public F checkSuccess(final Function<? super V,Boolean> val) {
+ return checkSuccess(Functionals.predicate(val));
+ }
+ @SuppressWarnings("unused")
+ /** @deprecated since 0.7.0, kept for rebind */ @Deprecated
+ private F checkSuccessLegacy(final Function<? super V,Boolean> val) {
+ return checkSuccess(new Predicate<V>() {
+ @Override
+ public boolean apply(V input) {
+ return val.apply(input);
+ }
+ });
+ }
+
+ public F onSuccess(Function<? super V,T> val) {
+ this.onsuccess = checkNotNull(val, "onSuccess");
+ return self();
+ }
+
+ public F setOnSuccess(T val) {
+ return onSuccess(Functions.constant(val));
+ }
+
+ /** a failure is when the connection is fine (no exception) but the other end returns a result object V
+ * which the feed can tell indicates a failure (e.g. HTTP code 404) */
+ public F onFailure(Function<? super V,T> val) {
+ this.onfailure = checkNotNull(val, "onFailure");
+ return self();
+ }
+
+ public F setOnFailure(T val) {
+ return onFailure(Functions.constant(val));
+ }
+
+ /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)},
+ * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */
+ public F onResult(Function<? super V, T> val) {
+ onSuccess(val);
+ return onFailure(val);
+ }
+
+ public F setOnResult(T val) {
+ return onResult(Functions.constant(val));
+ }
+
+ /** an exception is when there is an error in the communication */
+ public F onException(Function<? super Exception,T> val) {
+ this.onexception = checkNotNull(val, "onException");
+ return self();
+ }
+
+ public F setOnException(T val) {
+ return onException(Functions.constant(val));
+ }
+
+ /** convenience for indicating a behaviour to occur for both
+ * {@link #onException(Function)}
+ * (error connecting) and
+ * {@link #onFailure(Function)}
+ * (successful communication but failure report from remote end) */
+ public F onFailureOrException(Function<Object,T> val) {
+ onFailure(val);
+ return onException(val);
+ }
+
+ public F setOnFailureOrException(T val) {
+ return onFailureOrException(Functions.constant(val));
+ }
+
+ public F suppressDuplicates(boolean val) {
+ suppressDuplicates = val;
+ return self();
+ }
+
+ /**
+ * Whether this feed is enabled (defaulting to true).
+ */
+ public F enabled(boolean val) {
+ enabled = val;
+ return self();
+ }
+
+ public boolean hasSuccessHandler() {
+ return this.onsuccess != null;
+ }
+
+ public boolean hasFailureHandler() {
+ return this.onfailure != null;
+ }
+
+ public boolean hasExceptionHandler() {
+ return this.onexception != null;
+ }
+
+ public boolean hasCheckSuccessHandler() {
+ return this.checkSuccess != null;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append(toStringBaseName());
+ result.append("[");
+ boolean contents = false;
+ Object source = toStringPollSource();
+ AttributeSensor<T> s = getSensor();
+ if (Strings.isNonBlank(Strings.toString(source))) {
+ result.append(Strings.toUniqueString(source, 40));
+ if (s!=null) {
+ result.append("->");
+ result.append(s.getName());
+ }
+ contents = true;
+ } else if (s!=null) {
+ result.append(s.getName());
+ contents = true;
+ }
+ MutableList<Object> fields = toStringOtherFields();
+ if (fields!=null) {
+ for (Object field: fields) {
+ if (Strings.isNonBlank(Strings.toString(field))) {
+ if (contents) result.append(";");
+ contents = true;
+ result.append(field);
+ }
+ }
+ }
+ result.append("]");
+ return result.toString();
+ }
+
+ /** can be overridden to supply a simpler base name than the class name */
+ protected String toStringBaseName() {
+ return JavaClassNames.simpleClassName(this);
+ }
+ /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */
+ protected MutableList<Object> toStringOtherFields() {
+ return MutableList.<Object>of();
+ }
+ /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */
+ protected Object toStringPollSource() {
+ return null;
+ }
+ /** all configs should supply a unique tag element, inserted into the feed */
+ protected String getUniqueTag() {
+ return toString();
+ }
+
+ /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()};
+ * subclasses can add to the returned value */
+ protected MutableList<Object> equalsFields() {
+ MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource());
+ for (Object field: toStringOtherFields()) result.appendIfNotNull(field);
+ return result;
+ }
+
+ @Override
+ public int hashCode() {
+ int hc = super.hashCode();
+ for (Object f: equalsFields())
+ hc = Objects.hashCode(hc, f);
+ return hc;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!super.equals(obj)) return false;
+ PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj;
+ if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false;
+ if (!Objects.equal(equalsFields(), other.equalsFields())) return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
new file mode 100644
index 0000000..133431b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.time.Duration;
+
+/**
+ * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http).
+ *
+ * @author aled
+ */
+public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
+
+ private long period = -1;
+ private String description;
+
+ public PollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ }
+
+ public PollConfig(PollConfig<V,T,F> other) {
+ super(other);
+ this.period = other.period;
+ }
+
+ public long getPeriod() {
+ return period;
+ }
+
+ public F period(Duration val) {
+ checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero");
+ this.period = val.toMilliseconds();
+ return self();
+ }
+
+ public F period(long val) {
+ checkArgument(val >= 0, "period must be greater than or equal to zero");
+ this.period = val; return self();
+ }
+
+ public F period(long val, TimeUnit units) {
+ checkArgument(val >= 0, "period must be greater than or equal to zero");
+ return period(units.toMillis(val));
+ }
+
+ public F description(String description) {
+ this.description = description;
+ return self();
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override protected MutableList<Object> toStringOtherFields() {
+ return super.toStringOtherFields().appendIfNotNull(description);
+ }
+
+ @Override
+ protected MutableList<Object> equalsFields() {
+ return super.equalsFields().appendIfNotNull(period);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
new file mode 100644
index 0000000..a63ebde
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+/**
+ * Notified by the Poller of the result for each job, on each poll.
+ *
+ * @author aled
+ */
+public interface PollHandler<V> {
+
+ public boolean checkSuccess(V val);
+
+ public void onSuccess(V val);
+
+ public void onFailure(V val);
+
+ public void onException(Exception exception);
+
+ public String getDescription();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
new file mode 100644
index 0000000..fd50ebd
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.feed;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
+import org.apache.brooklyn.util.core.task.ScheduledTask;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+
+
+/**
+ * For executing periodic polls.
+ * Jobs are added to the schedule, and then the poller is started.
+ * The jobs will then be executed periodically, and the handler called for the result/failure.
+ *
+ * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently.
+ */
+public class Poller<V> {
+ public static final Logger log = LoggerFactory.getLogger(Poller.class);
+
+ private final EntityLocal entity;
+ private final boolean onlyIfServiceUp;
+ private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>();
+ private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
+ private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
+ private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
+ private volatile boolean started = false;
+
+ private static class PollJob<V> {
+ final PollHandler<? super V> handler;
+ final Duration pollPeriod;
+ final Runnable wrappedJob;
+ private boolean loggedPreviousException = false;
+
+ PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
+ this.handler = handler;
+ this.pollPeriod = period;
+
+ wrappedJob = new Runnable() {
+ public void run() {
+ try {
+ V val = job.call();
+ loggedPreviousException = false;
+ if (handler.checkSuccess(val)) {
+ handler.onSuccess(val);
+ } else {
+ handler.onFailure(val);
+ }
+ } catch (Exception e) {
+ if (loggedPreviousException) {
+ if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler});
+ } else {
+ if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler});
+ loggedPreviousException = true;
+ }
+ handler.onException(e);
+ }
+ }
+ };
+ }
+ }
+
+ /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */
+ @Deprecated
+ public Poller(EntityLocal entity) {
+ this(entity, false);
+ }
+ public Poller(EntityLocal entity, boolean onlyIfServiceUp) {
+ this.entity = entity;
+ this.onlyIfServiceUp = onlyIfServiceUp;
+ }
+
+ /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */
+ public void submit(Callable<?> job) {
+ if (started) {
+ throw new IllegalStateException("Cannot submit additional tasks after poller has started");
+ }
+ oneOffJobs.add(job);
+ }
+
+ public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) {
+ scheduleAtFixedRate(job, handler, Duration.millis(period));
+ }
+ public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
+ if (started) {
+ throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
+ }
+ PollJob<V> foo = new PollJob<V>(job, handler, period);
+ pollJobs.add(foo);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public void start() {
+ // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore
+ // Is that ok, are can we do better?
+
+ if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
+ if (started) {
+ throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running",
+ this, entity));
+ }
+
+ started = true;
+
+ for (final Callable<?> oneOffJob : oneOffJobs) {
+ Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build();
+ oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
+ }
+
+ for (final PollJob<V> pollJob : pollJobs) {
+ final String scheduleName = pollJob.handler.getDescription();
+ if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
+ Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() {
+ public Task<?> call() {
+ DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity),
+ new Callable<Void>() { public Void call() {
+ if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+ return null;
+ }
+ pollJob.wrappedJob.run();
+ return null;
+ } } );
+ BrooklynTaskTags.setTransient(task);
+ return task;
+ }
+ };
+ ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory);
+ tasks.add((ScheduledTask)Entities.submit(entity, task));
+ } else {
+ if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
+ }
+ }
+ }
+
+ public void stop() {
+ if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this});
+ if (!started) {
+ throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running",
+ this, entity));
+ }
+
+ started = false;
+ for (Task<?> task : oneOffTasks) {
+ if (task != null) task.cancel(true);
+ }
+ for (ScheduledTask task : tasks) {
+ if (task != null) task.cancel();
+ }
+ oneOffTasks.clear();
+ tasks.clear();
+ }
+
+ public boolean isRunning() {
+ boolean hasActiveTasks = false;
+ for (Task<?> task: tasks) {
+ if (task.isBegun() && !task.isDone()) {
+ hasActiveTasks = true;
+ break;
+ }
+ }
+ if (!started && hasActiveTasks) {
+ log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
+ }
+ return started && hasActiveTasks;
+ }
+
+ protected boolean isEmpty() {
+ return pollJobs.isEmpty();
+ }
+
+ public String toString() {
+ return Objects.toStringHelper(this).add("entity", entity).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
index 2e4a971..2a5e92e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java
@@ -34,11 +34,11 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.entity.group.AbstractGroupImpl;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
index 479fbbf..4630be1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicFeedRebindSupport.java
@@ -20,7 +20,7 @@ package org.apache.brooklyn.core.mgmt.rebind;
import org.apache.brooklyn.api.mgmt.rebind.RebindContext;
import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index c3e8030..e9478ef 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -67,6 +67,7 @@ import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
import org.apache.brooklyn.core.entity.AbstractApplication;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.location.AbstractLocation;
import org.apache.brooklyn.core.location.internal.LocationInternal;
import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContext;
@@ -86,7 +87,6 @@ import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory;
import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
index 761341b..929b63c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
@@ -51,6 +51,7 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.catalog.internal.CatalogItemDo;
import org.apache.brooklyn.core.entity.EntityDynamicType;
import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
import org.apache.brooklyn.core.location.internal.LocationInternal;
import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport;
@@ -58,7 +59,6 @@ import org.apache.brooklyn.core.mgmt.rebind.TreeUtils;
import org.apache.brooklyn.core.objs.BrooklynTypes;
import org.apache.brooklyn.core.policy.AbstractPolicy;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.flags.FlagUtils;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
index 940d949..f76baaa 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeSensorAndConfigKey.java
@@ -27,7 +27,7 @@ import org.apache.brooklyn.core.config.BasicConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.AbstractEntity;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
-import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
+import org.apache.brooklyn.core.feed.ConfigToAttributes;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
index 79660ce..542fc01 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/HttpRequestSensor.java
@@ -26,9 +26,9 @@ import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.effector.AddSensor;
-import org.apache.brooklyn.sensor.feed.http.HttpFeed;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index a6880b7..e914bd2 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -30,8 +30,8 @@ import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.util.collections.MutableMap;
import com.google.common.base.Function;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
index af34e0d..7f10d5b 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/stock/DataEntityImpl.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.entity.AbstractEntity;
-import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
-import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
import com.google.common.base.Functions;
import com.google.common.base.Supplier;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
new file mode 100644
index 0000000..55db890
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * Provides a feed of attribute values, by periodically invoking functions.
+ *
+ * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
+ * <pre>
+ * {@code
+ * private FunctionFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = FunctionFeed.builder()
+ * .entity(this)
+ * .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
+ * .period(500, TimeUnit.MILLISECONDS)
+ * .callable(new Callable<Boolean>() {
+ * public Boolean call() throws Exception {
+ * return getDriver().isRunning();
+ * }
+ * })
+ * .onExceptionOrFailure(Functions.constant(Boolan.FALSE))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @author aled
+ */
+public class FunctionFeed extends AbstractFeed {
+
+ private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class);
+
+ // Treat as immutable once built
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static Builder builder(String uniqueTag) {
+ return new Builder().uniqueTag(uniqueTag);
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private boolean onlyIfServiceUp = false;
+ private long period = 500;
+ private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
+ private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
+ public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
+ this.onlyIfServiceUp = onlyIfServiceUp;
+ return this;
+ }
+ public Builder period(Duration d) {
+ return period(d.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long millis) {
+ return period(millis, TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long val, TimeUnit units) {
+ this.period = val;
+ this.periodUnits = units;
+ return this;
+ }
+ public Builder poll(FunctionPollConfig<?,?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public FunctionFeed build() {
+ built = true;
+ FunctionFeed result = new FunctionFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("FunctionFeed.Builder created, but build() never called");
+ }
+ }
+
+ private static class FunctionPollIdentifier {
+ final Callable<?> job;
+
+ private FunctionPollIdentifier(Callable<?> job) {
+ this.job = checkNotNull(job, "job");
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(job);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job);
+ }
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public FunctionFeed() {
+ }
+
+ protected FunctionFeed(Builder builder) {
+ setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
+
+ SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create();
+ for (FunctionPollConfig<?,?> config : builder.polls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+ Callable<?> job = config.getCallable();
+ polls.put(new FunctionPollIdentifier(job), configCopy);
+ }
+ setConfig(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls.values());
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ protected void preStart() {
+ SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS);
+ for (final FunctionPollIdentifier pollInfo : polls.keySet()) {
+ Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo);
+ long minPeriod = Integer.MAX_VALUE;
+ Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
+
+ for (FunctionPollConfig<?,?> config : configs) {
+ handlers.add(new AttributePollHandler(config, entity, this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ getPoller().scheduleAtFixedRate(
+ (Callable)pollInfo.job,
+ new DelegatingPollHandler(handlers),
+ minPeriod);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
new file mode 100644
index 0000000..4951868
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
+
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.FeedConfig;
+import org.apache.brooklyn.core.feed.PollConfig;
+import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
+import org.apache.brooklyn.util.guava.Functionals;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+
+import com.google.common.base.Supplier;
+
+public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> {
+
+ private Callable<?> callable;
+
+ public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) {
+ return new FunctionPollConfig<Object, T>(sensor);
+ }
+
+ public FunctionPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ }
+
+ public FunctionPollConfig(FunctionPollConfig<S, T> other) {
+ super(other);
+ callable = other.callable;
+ }
+
+ public Callable<? extends Object> getCallable() {
+ return callable;
+ }
+
+ /**
+ * The {@link Callable} to be invoked on each poll.
+ * <p>
+ * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
+ * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
+ * return the wrong type.
+ */
+ @SuppressWarnings("unchecked")
+ public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) {
+ this.callable = checkNotNull(val, "callable");
+ return (FunctionPollConfig<newS, T>) this;
+ }
+
+ /**
+ * Supplies the value to be returned by each poll.
+ * <p>
+ * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
+ * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
+ * return the wrong type.
+ */
+ @SuppressWarnings("unchecked")
+ public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) {
+ this.callable = Functionals.callable( checkNotNull(val, "supplier") );
+ return (FunctionPollConfig<newS, T>) this;
+ }
+
+ /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */
+ @SuppressWarnings({ "unchecked", "unused" })
+ private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) {
+ checkNotNull(val, "supplier");
+ this.callable = new Callable<newS>() {
+ @Override
+ public newS call() throws Exception {
+ return val.get();
+ }
+ };
+ return (FunctionPollConfig<newS, T>) this;
+ }
+
+ public FunctionPollConfig<S, T> closure(Closure<?> val) {
+ this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure"));
+ return this;
+ }
+
+ @Override protected String toStringBaseName() { return "fn"; }
+ @Override protected String toStringPollSource() {
+ if (callable==null) return null;
+ String cs = callable.toString();
+ if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
+ return cs;
+ }
+ // if hashcode is in callable it's probably a custom internal; return class name
+ return JavaClassNames.simpleClassName(callable);
+ }
+
+}