You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/20 00:54:13 UTC
[25/36] incubator-brooklyn git commit: Rename o.a.b.sensor.feed to
o.a.b.feed and o.a.b.core.feed
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
new file mode 100644
index 0000000..d34d0a5
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.cloudsoft.winrm4j.winrm.WinRmToolResponse;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.EffectorTasks;
+import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.feed.AbstractFeed;
+import org.apache.brooklyn.core.feed.PollHandler;
+import org.apache.brooklyn.core.feed.Poller;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.winrm.WinRmMachineLocation;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A sensor feed that retrieves performance counters from a Windows host and posts the values to sensors.
+ *
+ * <p>To use this feed, you must provide the entity, and a collection of mappings between Windows performance counter
+ * names and Brooklyn attribute sensors.</p>
+ *
+ * <p>This feed uses SSH to invoke the windows utility <tt>typeperf</tt> to query for a specific set of performance
+ * counters, by name. The values are extracted from the response, and published to the entity's sensors.</p>
+ *
+ * <p>Example:</p>
+ *
+ * {@code
+ * @Override
+ * protected void connectSensors() {
+ * WindowsPerformanceCounterFeed feed = WindowsPerformanceCounterFeed.builder()
+ * .entity(entity)
+ * .addSensor("\\Processor(_total)\\% Idle Time", CPU_IDLE_TIME)
+ * .addSensor("\\Memory\\Available MBytes", AVAILABLE_MEMORY)
+ * .build();
+ * }
+ * }
+ *
+ * @since 0.6.0
+ * @author richardcloudsoft
+ */
+public class WindowsPerformanceCounterFeed extends AbstractFeed {
+
+ private static final Logger log = LoggerFactory.getLogger(WindowsPerformanceCounterFeed.class);
+
+ // This pattern matches CSV line(s) with the date in the first field, and at least one further field.
+ protected static final Pattern lineWithPerfData = Pattern.compile("^\"[\\d:/\\-. ]+\",\".*\"$", Pattern.MULTILINE);
+ private static final Joiner JOINER_ON_SPACE = Joiner.on(' ');
+ private static final Joiner JOINER_ON_COMMA = Joiner.on(',');
+ private static final int OUTPUT_COLUMN_WIDTH = 100;
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Collection<WindowsPerformanceCounterPollConfig<?>>> POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<Collection<WindowsPerformanceCounterPollConfig<?>>>() {},
+ "polls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private Set<WindowsPerformanceCounterPollConfig<?>> polls = Sets.newLinkedHashSet();
+ private Duration period = Duration.of(30, TimeUnit.SECONDS);
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = checkNotNull(val, "entity");
+ return this;
+ }
+ public Builder addSensor(WindowsPerformanceCounterPollConfig<?> config) {
+ polls.add(config);
+ return this;
+ }
+ public Builder addSensor(String performanceCounterName, AttributeSensor<?> sensor) {
+ return addSensor(new WindowsPerformanceCounterPollConfig(sensor).performanceCounterName(checkNotNull(performanceCounterName, "performanceCounterName")));
+ }
+ public Builder addSensors(Map<String, AttributeSensor> sensors) {
+ for (Map.Entry<String, AttributeSensor> entry : sensors.entrySet()) {
+ addSensor(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+ public Builder period(Duration period) {
+ this.period = checkNotNull(period, "period");
+ return this;
+ }
+ public Builder period(long millis) {
+ return period(millis, TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long val, TimeUnit units) {
+ return period(Duration.of(val, units));
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public WindowsPerformanceCounterFeed build() {
+ built = true;
+ WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("WindowsPerformanceCounterFeed.Builder created, but build() never called");
+ }
+ }
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public WindowsPerformanceCounterFeed() {
+ }
+
+ protected WindowsPerformanceCounterFeed(Builder builder) {
+ List<WindowsPerformanceCounterPollConfig<?>> polls = Lists.newArrayList();
+ for (WindowsPerformanceCounterPollConfig<?> config : builder.polls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ WindowsPerformanceCounterPollConfig<?> configCopy = new WindowsPerformanceCounterPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period);
+ polls.add(configCopy);
+ }
+ config().set(POLLS, polls);
+ initUniqueTag(builder.uniqueTag, polls);
+ }
+
+ @Override
+ protected void preStart() {
+ Collection<WindowsPerformanceCounterPollConfig<?>> polls = getConfig(POLLS);
+
+ long minPeriod = Integer.MAX_VALUE;
+ List<String> performanceCounterNames = Lists.newArrayList();
+ for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+ minPeriod = Math.min(minPeriod, config.getPeriod());
+ performanceCounterNames.add(config.getPerformanceCounterName());
+ }
+
+ Iterable<String> allParams = ImmutableList.<String>builder()
+ .add("(Get-Counter")
+ .add("-Counter")
+ .add(JOINER_ON_COMMA.join(Iterables.transform(performanceCounterNames, QuoteStringFunction.INSTANCE)))
+ .add("-SampleInterval")
+ .add("2") // TODO: extract SampleInterval as a config key
+ .add(").CounterSamples")
+ .add("|")
+ .add("Format-Table")
+ .add(String.format("@{Expression={$_.Path};width=%d},@{Expression={$_.CookedValue};width=%<d}", OUTPUT_COLUMN_WIDTH))
+ .add("-HideTableHeaders")
+ .add("|")
+ .add("Out-String")
+ .add("-Width")
+ .add(String.valueOf(OUTPUT_COLUMN_WIDTH * 2))
+ .build();
+ String command = JOINER_ON_SPACE.join(allParams);
+ log.debug("Windows performance counter poll command for {} will be: {}", entity, command);
+
+ GetPerformanceCountersJob<WinRmToolResponse> job = new GetPerformanceCountersJob(getEntity(), command);
+ getPoller().scheduleAtFixedRate(
+ new CallInEntityExecutionContext(entity, job),
+ new SendPerfCountersToSensors(getEntity(), polls),
+ minPeriod);
+ }
+
+ private static class GetPerformanceCountersJob<T> implements Callable<T> {
+
+ private final Entity entity;
+ private final String command;
+
+ GetPerformanceCountersJob(Entity entity, String command) {
+ this.entity = entity;
+ this.command = command;
+ }
+
+ @Override
+ public T call() throws Exception {
+ WinRmMachineLocation machine = EffectorTasks.getWinRmMachine(entity);
+ WinRmToolResponse response = machine.executePsScript(command);
+ return (T)response;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Poller<WinRmToolResponse> getPoller() {
+ return (Poller<WinRmToolResponse>) super.getPoller();
+ }
+
+ /**
+ * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable}, where the
+ * inner {@link java.util.concurrent.Callable} is executed in the context of a
+ * specific entity.
+ *
+ * @param <T> The type of the {@link java.util.concurrent.Callable}.
+ */
+ private static class CallInEntityExecutionContext<T> implements Callable<T> {
+ private final Callable<T> job;
+ private EntityLocal entity;
+
+ private CallInEntityExecutionContext(EntityLocal entity, Callable<T> job) {
+ this.job = job;
+ this.entity = entity;
+ }
+
+ @Override
+ public T call() throws Exception {
+ ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext();
+ return executionContext.submit(Maps.newHashMap(), job).get();
+ }
+ }
+
+ @VisibleForTesting
+ static class SendPerfCountersToSensors implements PollHandler<WinRmToolResponse> {
+ private final EntityLocal entity;
+ private final List<WindowsPerformanceCounterPollConfig<?>> polls;
+ private final Set<AttributeSensor<?>> failedAttributes = Sets.newLinkedHashSet();
+ private static final Pattern MACHINE_NAME_LOOKBACK_PATTERN = Pattern.compile(String.format("(?<=\\\\\\\\.{0,%d})\\\\.*", OUTPUT_COLUMN_WIDTH));
+
+ public SendPerfCountersToSensors(EntityLocal entity, Collection<WindowsPerformanceCounterPollConfig<?>> polls) {
+ this.entity = entity;
+ this.polls = ImmutableList.copyOf(polls);
+ }
+
+ @Override
+ public boolean checkSuccess(WinRmToolResponse val) {
+ // TODO not just using statusCode; also looking at absence of stderr.
+ // Status code is (empirically) unreliable: it returns 0 sometimes even when failed
+ // (but never returns non-zero on success).
+ if (val.getStatusCode() != 0) return false;
+ String stderr = val.getStdErr();
+ if (stderr == null || stderr.length() != 0) return false;
+ String out = val.getStdOut();
+ if (out == null || out.length() == 0) return false;
+ return true;
+ }
+
+ @Override
+ public void onSuccess(WinRmToolResponse val) {
+ for (String pollResponse : val.getStdOut().split("\r\n")) {
+ if (Strings.isNullOrEmpty(pollResponse)) {
+ continue;
+ }
+ String path = pollResponse.substring(0, OUTPUT_COLUMN_WIDTH - 1);
+ // The performance counter output prepends the sensor name with "\\<machinename>" so we need to remove it
+ Matcher machineNameLookbackMatcher = MACHINE_NAME_LOOKBACK_PATTERN.matcher(path);
+ if (!machineNameLookbackMatcher.find()) {
+ continue;
+ }
+ String name = machineNameLookbackMatcher.group(0).trim();
+ String rawValue = pollResponse.substring(OUTPUT_COLUMN_WIDTH).replaceAll("^\\s+", "");
+ WindowsPerformanceCounterPollConfig<?> config = getPollConfig(name);
+ Class<?> clazz = config.getSensor().getType();
+ AttributeSensor<Object> attribute = (AttributeSensor<Object>) Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
+ try {
+ Object value = TypeCoercions.coerce(rawValue, TypeToken.of(clazz));
+ entity.setAttribute(attribute, value);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ if (failedAttributes.add(attribute)) {
+ log.warn("Failed to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
+ } else {
+ if (log.isTraceEnabled()) log.trace("Failed (repeatedly) to coerce value '{}' to {} for {} -> {}", new Object[] {rawValue, clazz, entity, attribute});
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(WinRmToolResponse val) {
+ log.error("Windows Performance Counter query did not respond as expected. exitcode={} stdout={} stderr={}",
+ new Object[]{val.getStatusCode(), val.getStdOut(), val.getStdErr()});
+ for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+ Class<?> clazz = config.getSensor().getType();
+ AttributeSensor<?> attribute = Sensors.newSensor(clazz, config.getSensor().getName(), config.getDescription());
+ entity.setAttribute(attribute, null);
+ }
+ }
+
+ @Override
+ public void onException(Exception exception) {
+ log.error("Detected exception while retrieving Windows Performance Counters from entity " +
+ entity.getDisplayName(), exception);
+ for (WindowsPerformanceCounterPollConfig<?> config : polls) {
+ entity.setAttribute(Sensors.newSensor(config.getSensor().getClass(), config.getPerformanceCounterName(), config.getDescription()), null);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "" + polls;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()+"["+getDescription()+"]";
+ }
+
+ private WindowsPerformanceCounterPollConfig<?> getPollConfig(String sensorName) {
+ for (WindowsPerformanceCounterPollConfig<?> poll : polls) {
+ if (poll.getPerformanceCounterName().equalsIgnoreCase(sensorName)) {
+ return poll;
+ }
+ }
+ throw new IllegalStateException(String.format("%s not found in configured polls: %s", sensorName, polls));
+ }
+ }
+
+ static class PerfCounterValueIterator implements Iterator<String> {
+
+ // This pattern matches the contents of the first field, and optionally matches the rest of the line as
+ // further fields. Feed the second match back into the pattern again to get the next field, and repeat until
+ // all fields are discovered.
+ protected static final Pattern splitPerfData = Pattern.compile("^\"([^\\\"]*)\"((,\"[^\\\"]*\")*)$");
+
+ private Matcher matcher;
+
+ public PerfCounterValueIterator(String input) {
+ matcher = splitPerfData.matcher(input);
+ // Throw away the first element (the timestamp) (and also confirm that we have a pattern match)
+ checkArgument(hasNext(), "input "+input+" does not match expected pattern "+splitPerfData.pattern());
+ next();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return matcher != null && matcher.find();
+ }
+
+ @Override
+ public String next() {
+ String next = matcher.group(1);
+
+ String remainder = matcher.group(2);
+ if (!Strings.isNullOrEmpty(remainder)) {
+ assert remainder.startsWith(",");
+ remainder = remainder.substring(1);
+ matcher = splitPerfData.matcher(remainder);
+ } else {
+ matcher = null;
+ }
+
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static enum QuoteStringFunction implements Function<String, String> {
+ INSTANCE;
+
+ @Nullable
+ @Override
+ public String apply(@Nullable String input) {
+ return input != null ? "\"" + input + "\"" : null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
new file mode 100644
index 0000000..1391c3e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterPollConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.feed.windows;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.feed.PollConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+public class WindowsPerformanceCounterPollConfig<T> extends PollConfig<Object, T, WindowsPerformanceCounterPollConfig<T>>{
+
+ private String performanceCounterName;
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public WindowsPerformanceCounterPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ description(sensor.getDescription());
+ onSuccess((Function)Functions.identity());
+ }
+
+ public WindowsPerformanceCounterPollConfig(WindowsPerformanceCounterPollConfig<T> other) {
+ super(other);
+ this.performanceCounterName = other.performanceCounterName;
+ }
+
+ public String getPerformanceCounterName() {
+ return performanceCounterName;
+ }
+
+ public WindowsPerformanceCounterPollConfig<T> performanceCounterName(String val) {
+ this.performanceCounterName = val; return this;
+ }
+
+ @Override protected String toStringPollSource() { return performanceCounterName; }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
deleted file mode 100644
index 327181a..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AbstractFeed.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Collection;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.rebind.RebindSupport;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento;
-import org.apache.brooklyn.api.sensor.Feed;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.BrooklynFeatureEnablement;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport;
-import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-import org.apache.brooklyn.util.text.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Captures common fields and processes for sensor feeds.
- * These generally poll or subscribe to get sensor values for an entity.
- * They make it easy to poll over http, jmx, etc.
- */
-public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed {
-
- private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class);
-
- public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("feed.onlyIfServiceUp", "", false);
-
- private final Object pollerStateMutex = new Object();
- private transient volatile Poller<?> poller;
- private transient volatile boolean activated;
- private transient volatile boolean suspended;
-
- public AbstractFeed() {
- }
-
- /**
- * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)}
- */
- @Deprecated
- public AbstractFeed(EntityLocal entity) {
- this(entity, false);
- }
-
- /**
- * @deprecated since 0.7.0; use no-arg constructor; call {@link #setEntity(EntityLocal)} and {@code setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp)}
- */
- @Deprecated
- public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) {
- this.entity = checkNotNull(entity, "entity");
- setConfig(ONLY_IF_SERVICE_UP, onlyIfServiceUp);
- }
-
- // Ensure idempotent, as called in builders (in case not registered with entity), and also called
- // when registering with entity
- @Override
- public void setEntity(EntityLocal entity) {
- super.setEntity(entity);
- if (BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) {
- ((EntityInternal)entity).feeds().addFeed(this);
- }
- }
-
- protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) {
- if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag;
- else this.uniqueTag = getDefaultUniqueTag(valsForDefault);
- }
-
- protected String getDefaultUniqueTag(Object ...valsForDefault) {
- StringBuilder sb = new StringBuilder();
- sb.append(JavaClassNames.simpleClassName(this));
- if (valsForDefault.length==0) {
- sb.append("@");
- sb.append(hashCode());
- } else if (valsForDefault.length==1 && valsForDefault[0] instanceof Collection){
- sb.append(Strings.toUniqueString(valsForDefault[0], 80));
- } else {
- sb.append("[");
- boolean first = true;
- for (Object x: valsForDefault) {
- if (!first) sb.append(";");
- else first = false;
- sb.append(Strings.toUniqueString(x, 80));
- }
- sb.append("]");
- }
- return sb.toString();
- }
-
- @Override
- public void start() {
- if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
- if (activated) {
- throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running",
- this, entity));
- }
- if (poller != null) {
- throw new IllegalStateException(String.format("Attempt to re-start feed %s of entity %s", this, entity));
- }
-
- poller = new Poller<Object>(entity, getConfig(ONLY_IF_SERVICE_UP));
- activated = true;
- preStart();
- synchronized (pollerStateMutex) {
- // don't start poller if we are suspended
- if (!suspended) {
- poller.start();
- }
- }
- }
-
- @Override
- public void suspend() {
- synchronized (pollerStateMutex) {
- if (activated && !suspended) {
- poller.stop();
- }
- suspended = true;
- }
- }
-
- @Override
- public void resume() {
- synchronized (pollerStateMutex) {
- if (activated && suspended) {
- poller.start();
- }
- suspended = false;
- }
- }
-
- @Override
- public void destroy() {
- stop();
- }
-
- @Override
- public void stop() {
- if (!activated) {
- log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this, entity);
- return;
- }
- if (log.isDebugEnabled()) log.debug("stopping feed {} for {}", this, entity);
-
- activated = false;
- preStop();
- synchronized (pollerStateMutex) {
- if (!suspended) {
- poller.stop();
- }
- }
- postStop();
- super.destroy();
- }
-
- @Override
- public boolean isActivated() {
- return activated;
- }
-
- public EntityLocal getEntity() {
- return entity;
- }
-
- protected boolean isConnected() {
- // TODO Default impl will result in multiple logs for same error if becomes unreachable
- // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
- // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
- // Would be nice if reduced this logging duplication.
- // (You can reduce it by providing a better 'isConnected' implementation of course.)
- return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
- }
-
- @Override
- public boolean isSuspended() {
- return suspended;
- }
-
- @Override
- public boolean isRunning() {
- return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning();
- }
-
- @Override
- public RebindSupport<FeedMemento> getRebindSupport() {
- return new BasicFeedRebindSupport(this);
- }
-
- @Override
- protected void onChanged() {
- // TODO Auto-generated method stub
- }
-
- /**
- * For overriding.
- */
- protected void preStart() {
- }
-
- /**
- * For overriding.
- */
- protected void preStop() {
- }
-
- /**
- * For overriding.
- */
- protected void postStop() {
- }
-
- /**
- * For overriding, where sub-class can change return-type generics!
- */
- protected Poller<?> getPoller() {
- return poller;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
deleted file mode 100644
index eac972e..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/AttributePollHandler.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-/**
- * Handler for when polling an entity's attribute. On each poll result the entity's attribute is set.
- *
- * Calls to onSuccess and onError will happen sequentially, but may be called from different threads
- * each time. Note that no guarantees of a synchronized block exist, so additional synchronization
- * would be required for the Java memory model's "happens before" relationship.
- *
- * @author aled
- */
-public class AttributePollHandler<V> implements PollHandler<V> {
-
- public static final Logger log = LoggerFactory.getLogger(AttributePollHandler.class);
-
- private final FeedConfig<V,?,?> config;
- private final EntityLocal entity;
- @SuppressWarnings("rawtypes")
- private final AttributeSensor sensor;
- private final AbstractFeed feed;
- private final boolean suppressDuplicates;
-
- // allow 30 seconds before logging at WARN, if there has been no success yet;
- // after success WARN immediately
- // TODO these should both be configurable
- private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
- private Duration logWarningGraceTime = Duration.millis(0);
-
- // internal state to look after whether to log warnings
- private volatile Long lastSuccessTime = null;
- private volatile Long currentProblemStartTime = null;
- private volatile boolean currentProblemLoggedAsWarning = false;
- private volatile boolean lastWasProblem = false;
-
-
- public AttributePollHandler(FeedConfig<V,?,?> config, EntityLocal entity, AbstractFeed feed) {
- this.config = checkNotNull(config, "config");
- this.entity = checkNotNull(entity, "entity");
- this.sensor = checkNotNull(config.getSensor(), "sensor");
- this.feed = checkNotNull(feed, "feed");
- this.suppressDuplicates = config.getSupressDuplicates();
- }
-
- @Override
- public boolean checkSuccess(V val) {
- // Always true if no checkSuccess predicate was configured.
- return !config.hasCheckSuccessHandler() || config.getCheckSuccess().apply(val);
- }
-
- @Override
- public void onSuccess(V val) {
- if (lastWasProblem) {
- if (currentProblemLoggedAsWarning) {
- log.info("Success (following previous problem) reading "+getBriefDescription());
- } else {
- log.debug("Success (following previous problem) reading "+getBriefDescription());
- }
- lastWasProblem = false;
- currentProblemStartTime = null;
- currentProblemLoggedAsWarning = false;
- }
- lastSuccessTime = System.currentTimeMillis();
- if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val});
-
- try {
- setSensor(transformValueOnSuccess(val));
- } catch (Exception e) {
- if (feed.isConnected()) {
- log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e);
- } else {
- if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e);
- }
- }
- }
-
- /** allows post-processing, such as applying a success handler;
- * default applies the onSuccess handler (which is recommended) */
- protected Object transformValueOnSuccess(V val) {
- return config.hasSuccessHandler() ? config.getOnSuccess().apply(val) : val;
- }
-
- @Override
- public void onFailure(V val) {
- if (!config.hasFailureHandler()) {
- onException(new Exception("checkSuccess of "+this+" for "+getBriefDescription()+" was false but poller has no failure handler"));
- } else {
- logProblem("failure", val);
-
- try {
- setSensor(config.hasFailureHandler() ? config.getOnFailure().apply((V)val) : val);
- } catch (Exception e) {
- if (feed.isConnected()) {
- log.warn("Error computing " + getBriefDescription() + "; val=" + val+": "+ e, e);
- } else {
- if (log.isDebugEnabled())
- log.debug("Error computing " + getBriefDescription() + "; val=" + val + " (when inactive)", e);
- }
- }
- }
- }
-
- @Override
- public void onException(Exception exception) {
- if (!feed.isConnected()) {
- if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception});
- } else {
- logProblem("exception", exception);
- }
-
- if (config.hasExceptionHandler()) {
- try {
- setSensor( config.getOnException().apply(exception) );
- } catch (Exception e) {
- if (feed.isConnected()) {
- log.warn("unable to compute "+getBriefDescription()+"; on exception="+exception, e);
- } else {
- if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; exception="+exception+" (when inactive)", e);
- }
- }
- }
- }
-
- protected void logProblem(String type, Object val) {
- if (lastWasProblem && currentProblemLoggedAsWarning) {
- if (log.isTraceEnabled())
- log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val});
- } else {
- long nowTime = System.currentTimeMillis();
- // get a non-volatile value
- Long currentProblemStartTimeCache = currentProblemStartTime;
- long expiryTime =
- (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() :
- currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() :
- nowTime+logWarningGraceTimeOnStartup.toMilliseconds();
- if (!lastWasProblem) {
- if (expiryTime <= nowTime) {
- currentProblemLoggedAsWarning = true;
- if (entity==null || !Entities.isNoLongerManaged(entity)) {
- log.warn("Read of " + getBriefDescription() + " gave " + type + ": " + val);
- } else {
- log.debug("Read of " + getBriefDescription() + " gave " + type + ": " + val);
- }
- if (log.isDebugEnabled() && val instanceof Throwable)
- log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
- } else {
- if (log.isDebugEnabled())
- log.debug("Read of " + getBriefDescription() + " gave " + type + " (in grace period): " + val);
- }
- lastWasProblem = true;
- currentProblemStartTime = nowTime;
- } else {
- if (expiryTime <= nowTime) {
- currentProblemLoggedAsWarning = true;
- log.warn("Read of " + getBriefDescription() + " gave " + type +
- " (grace period expired, occurring for "+Duration.millis(nowTime - currentProblemStartTimeCache)+
- (config.hasExceptionHandler() ? "" : ", no exception handler set for sensor")+
- ")"+
- ": " + val);
- if (log.isDebugEnabled() && val instanceof Throwable)
- log.debug("Trace for "+type+" reading "+getBriefDescription()+": "+val, (Throwable)val);
- } else {
- if (log.isDebugEnabled())
- log.debug("Recurring {} reading {} in {} (still in grace period): {}", new Object[] {type, this, getBriefDescription(), val});
- }
- }
- }
- }
-
- protected boolean isTransitioningOrStopped() {
- if (entity==null) return false;
- Transition expected = entity.getAttribute(Attributes.SERVICE_STATE_EXPECTED);
- if (expected==null) return false;
- return (expected.getState()==Lifecycle.STARTING || expected.getState()==Lifecycle.STOPPING || expected.getState()==Lifecycle.STOPPED);
- }
-
- @SuppressWarnings("unchecked")
- protected void setSensor(Object v) {
- if (Entities.isNoLongerManaged(entity)) {
- if (Tasks.isInterrupted()) return;
- log.warn(""+entity+" is not managed; feed "+this+" setting "+sensor+" to "+v+" at this time is not supported ("+Tasks.current()+")");
- }
-
- if (v == FeedConfig.UNCHANGED) {
- // nothing
- } else if (v == FeedConfig.REMOVE) {
- ((EntityInternal)entity).removeAttribute(sensor);
- } else if (sensor == FeedConfig.NO_SENSOR) {
- // nothing
- } else {
- Object coercedV = TypeCoercions.coerce(v, sensor.getType());
- if (suppressDuplicates && Objects.equal(coercedV, entity.getAttribute(sensor))) {
- // no change; nothing
- } else {
- entity.setAttribute(sensor, coercedV);
- }
- }
- }
-
- @Override
- public String toString() {
- return super.toString()+"["+getDescription()+"]";
- }
-
- @Override
- public String getDescription() {
- return sensor.getName()+" @ "+entity.getId()+" <- "+config;
- }
-
- protected String getBriefDescription() {
- return ""+entity+"->"+(sensor==FeedConfig.NO_SENSOR ? "(dynamic sensors)" : ""+sensor);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
deleted file mode 100644
index 7938cc4..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ConfigToAttributes.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.ManagementContext;
-import org.apache.brooklyn.api.sensor.Sensor;
-import org.apache.brooklyn.core.sensor.AttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.TemplatedStringAttributeSensorAndConfigKey;
-
-
-/** Simple config adapter for setting {@link AttributeSensorAndConfigKey} sensor values from the config value or config default */
-public class ConfigToAttributes {
-
- //normally just applied once, statically, not registered...
- public static void apply(EntityLocal entity) {
- for (Sensor<?> it : entity.getEntityType().getSensors()) {
- if (it instanceof AttributeSensorAndConfigKey) {
- apply(entity, (AttributeSensorAndConfigKey<?,?>)it);
- }
- }
- }
-
- /**
- * Convenience for ensuring an individual sensor is set from its config key
- * (e.g. sub-classes of DynamicWebAppCluster that don't want to set HTTP_PORT etc!)
- */
- public static <T> T apply(EntityLocal entity, AttributeSensorAndConfigKey<?,T> key) {
- T v = entity.getAttribute(key);
- if (v!=null) return v;
- v = key.getAsSensorValue(entity);
- if (v!=null) entity.setAttribute(key, v);
- return v;
- }
-
- /**
- * Convenience for transforming a config value (e.g. processing a {@link TemplatedStringAttributeSensorAndConfigKey}),
- * outside of the context of an entity.
- */
- public static <T> T transform(ManagementContext managementContext, AttributeSensorAndConfigKey<?,T> key) {
- return key.getAsSensorValue(managementContext);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
deleted file mode 100644
index 4433e83..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/DelegatingPollHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * A poll handler that delegates each call to a set of poll handlers.
- *
- * @author aled
- */
-public class DelegatingPollHandler<V> implements PollHandler<V> {
-
- private final List<AttributePollHandler<? super V>> delegates;
-
- public DelegatingPollHandler(Iterable<AttributePollHandler<? super V>> delegates) {
- super();
- this.delegates = ImmutableList.copyOf(delegates);
- }
-
- @Override
- public boolean checkSuccess(V val) {
- for (AttributePollHandler<? super V> delegate : delegates) {
- if (!delegate.checkSuccess(val))
- return false;
- }
- return true;
- }
-
- @Override
- public void onSuccess(V val) {
- for (AttributePollHandler<? super V> delegate : delegates) {
- delegate.onSuccess(val);
- }
- }
-
- @Override
- public void onFailure(V val) {
- for (AttributePollHandler<? super V> delegate : delegates) {
- delegate.onFailure(val);
- }
- }
-
- @Override
- public void onException(Exception exception) {
- for (AttributePollHandler<? super V> delegate : delegates) {
- delegate.onException(exception);
- }
- }
-
- @Override
- public String toString() {
- return super.toString()+"["+getDescription()+"]";
- }
-
- @Override
- public String getDescription() {
- if (delegates.isEmpty())
- return "(empty delegate list)";
- if (delegates.size()==1)
- return delegates.get(0).getDescription();
- StringBuilder sb = new StringBuilder();
- sb.append("[");
- int count = 0;
- for (AttributePollHandler<? super V> delegate : delegates) {
- if (count>0) sb.append("; ");
- sb.append(delegate.getDescription());
- if (count>2) {
- sb.append("; ...");
- break;
- }
- count++;
- }
- sb.append("]");
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
deleted file mode 100644
index 32ea2ab..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/FeedConfig.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.sensor.feed.http.HttpPollConfig;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-import org.apache.brooklyn.util.text.Strings;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
-
-/**
- * Configuration for a poll, or a subscription etc, that is being added to a feed.
- *
- * @author aled
- */
-public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
-
- /** The onSuccess or onError functions can return this value to indicate that the sensor should not change.
- * @deprecated since 0.7.0 use UNCHANGED */
- public static final Object UNSET = Entities.UNCHANGED;
- /** The onSuccess or onError functions can return this value to indicate that the sensor should not change. */
- public static final Object UNCHANGED = Entities.UNCHANGED;
- /** The onSuccess or onError functions can return this value to indicate that the sensor value should be removed
- * (cf 'null', but useful in dynamic situations) */
- public static final Object REMOVE = Entities.REMOVE;
-
- /** Indicates that no sensor is being used here. This sensor is suppressed,
- * but is useful where you want to use the feeds with custom success/exception/failure functions
- * which directly set multiple sensors, e.g. dynamically based on the poll response.
- * <p>
- * See {@link HttpPollConfig#forMultiple()} and its usages.
- * (It can work for any poll config, but conveniences have not been supplied for others.) */
- public static final AttributeSensor<Void> NO_SENSOR = Sensors.newSensor(Void.class, "brooklyn.no.sensor");
-
- private final AttributeSensor<T> sensor;
- private Function<? super V, T> onsuccess;
- private Function<? super V, T> onfailure;
- private Function<? super Exception, T> onexception;
- private Predicate<? super V> checkSuccess;
- private boolean suppressDuplicates;
- private boolean enabled = true;
-
- public FeedConfig(AttributeSensor<T> sensor) {
- this.sensor = checkNotNull(sensor, "sensor");
- }
-
- public FeedConfig(FeedConfig<V, T, F> other) {
- this.sensor = other.sensor;
- this.onsuccess = other.onsuccess;
- this.onfailure = other.onfailure;
- this.onexception = other.onexception;
- this.checkSuccess = other.checkSuccess;
- this.suppressDuplicates = other.suppressDuplicates;
- this.enabled = other.enabled;
- }
-
- @SuppressWarnings("unchecked")
- protected F self() {
- return (F) this;
- }
-
- public AttributeSensor<T> getSensor() {
- return sensor;
- }
-
- public Predicate<? super V> getCheckSuccess() {
- return checkSuccess;
- }
-
- public Function<? super V, T> getOnSuccess() {
- return onsuccess;
- }
-
- public Function<? super V, T> getOnFailure() {
- return onfailure;
- }
-
- public Function<? super Exception, T> getOnException() {
- return onexception;
- }
-
- public boolean getSupressDuplicates() {
- return suppressDuplicates;
- }
-
- public boolean isEnabled() {
- return enabled;
- }
-
- /** sets the predicate used to check whether a feed run is successful */
- public F checkSuccess(Predicate<? super V> val) {
- this.checkSuccess = checkNotNull(val, "checkSuccess");
- return self();
- }
- /** as {@link #checkSuccess(Predicate)} */
- public F checkSuccess(final Function<? super V,Boolean> val) {
- return checkSuccess(Functionals.predicate(val));
- }
- @SuppressWarnings("unused")
- /** @deprecated since 0.7.0, kept for rebind */ @Deprecated
- private F checkSuccessLegacy(final Function<? super V,Boolean> val) {
- return checkSuccess(new Predicate<V>() {
- @Override
- public boolean apply(V input) {
- return val.apply(input);
- }
- });
- }
-
- public F onSuccess(Function<? super V,T> val) {
- this.onsuccess = checkNotNull(val, "onSuccess");
- return self();
- }
-
- public F setOnSuccess(T val) {
- return onSuccess(Functions.constant(val));
- }
-
- /** a failure is when the connection is fine (no exception) but the other end returns a result object V
- * which the feed can tell indicates a failure (e.g. HTTP code 404) */
- public F onFailure(Function<? super V,T> val) {
- this.onfailure = checkNotNull(val, "onFailure");
- return self();
- }
-
- public F setOnFailure(T val) {
- return onFailure(Functions.constant(val));
- }
-
- /** registers a callback to be used {@link #onSuccess(Function)} and {@link #onFailure(Function)},
- * i.e. whenever a result comes back, but not in case of exceptions being thrown (ie problems communicating) */
- public F onResult(Function<? super V, T> val) {
- onSuccess(val);
- return onFailure(val);
- }
-
- public F setOnResult(T val) {
- return onResult(Functions.constant(val));
- }
-
- /** an exception is when there is an error in the communication */
- public F onException(Function<? super Exception,T> val) {
- this.onexception = checkNotNull(val, "onException");
- return self();
- }
-
- public F setOnException(T val) {
- return onException(Functions.constant(val));
- }
-
- /** convenience for indicating a behaviour to occur for both
- * {@link #onException(Function)}
- * (error connecting) and
- * {@link #onFailure(Function)}
- * (successful communication but failure report from remote end) */
- public F onFailureOrException(Function<Object,T> val) {
- onFailure(val);
- return onException(val);
- }
-
- public F setOnFailureOrException(T val) {
- return onFailureOrException(Functions.constant(val));
- }
-
- public F suppressDuplicates(boolean val) {
- suppressDuplicates = val;
- return self();
- }
-
- /**
- * Whether this feed is enabled (defaulting to true).
- */
- public F enabled(boolean val) {
- enabled = val;
- return self();
- }
-
- public boolean hasSuccessHandler() {
- return this.onsuccess != null;
- }
-
- public boolean hasFailureHandler() {
- return this.onfailure != null;
- }
-
- public boolean hasExceptionHandler() {
- return this.onexception != null;
- }
-
- public boolean hasCheckSuccessHandler() {
- return this.checkSuccess != null;
- }
-
-
- @Override
- public String toString() {
- StringBuilder result = new StringBuilder();
- result.append(toStringBaseName());
- result.append("[");
- boolean contents = false;
- Object source = toStringPollSource();
- AttributeSensor<T> s = getSensor();
- if (Strings.isNonBlank(Strings.toString(source))) {
- result.append(Strings.toUniqueString(source, 40));
- if (s!=null) {
- result.append("->");
- result.append(s.getName());
- }
- contents = true;
- } else if (s!=null) {
- result.append(s.getName());
- contents = true;
- }
- MutableList<Object> fields = toStringOtherFields();
- if (fields!=null) {
- for (Object field: fields) {
- if (Strings.isNonBlank(Strings.toString(field))) {
- if (contents) result.append(";");
- contents = true;
- result.append(field);
- }
- }
- }
- result.append("]");
- return result.toString();
- }
-
- /** can be overridden to supply a simpler base name than the class name */
- protected String toStringBaseName() {
- return JavaClassNames.simpleClassName(this);
- }
- /** can be overridden to supply add'l info for the {@link #toString()}; subclasses can add to the returned value */
- protected MutableList<Object> toStringOtherFields() {
- return MutableList.<Object>of();
- }
- /** can be overridden to supply add'l info for the {@link #toString()}, placed before the sensor with -> */
- protected Object toStringPollSource() {
- return null;
- }
- /** all configs should supply a unique tag element, inserted into the feed */
- protected String getUniqueTag() {
- return toString();
- }
-
- /** returns fields which should be used for equality, including by default {@link #toStringOtherFields()} and {@link #toStringPollSource()};
- * subclasses can add to the returned value */
- protected MutableList<Object> equalsFields() {
- MutableList<Object> result = MutableList.of().appendIfNotNull(getSensor()).appendIfNotNull(toStringPollSource());
- for (Object field: toStringOtherFields()) result.appendIfNotNull(field);
- return result;
- }
-
- @Override
- public int hashCode() {
- int hc = super.hashCode();
- for (Object f: equalsFields())
- hc = Objects.hashCode(hc, f);
- return hc;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!super.equals(obj)) return false;
- PollConfig<?,?,?> other = (PollConfig<?,?,?>) obj;
- if (!Objects.equal(getUniqueTag(), other.getUniqueTag())) return false;
- if (!Objects.equal(equalsFields(), other.equalsFields())) return false;
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
deleted file mode 100644
index 01a561b..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.time.Duration;
-
-/**
- * Configuration for polling, which is being added to a feed (e.g. to poll a given URL over http).
- *
- * @author aled
- */
-public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> {
-
- private long period = -1;
- private String description;
-
- public PollConfig(AttributeSensor<T> sensor) {
- super(sensor);
- }
-
- public PollConfig(PollConfig<V,T,F> other) {
- super(other);
- this.period = other.period;
- }
-
- public long getPeriod() {
- return period;
- }
-
- public F period(Duration val) {
- checkArgument(val.toMilliseconds() >= 0, "period must be greater than or equal to zero");
- this.period = val.toMilliseconds();
- return self();
- }
-
- public F period(long val) {
- checkArgument(val >= 0, "period must be greater than or equal to zero");
- this.period = val; return self();
- }
-
- public F period(long val, TimeUnit units) {
- checkArgument(val >= 0, "period must be greater than or equal to zero");
- return period(units.toMillis(val));
- }
-
- public F description(String description) {
- this.description = description;
- return self();
- }
-
- public String getDescription() {
- return description;
- }
-
- @Override protected MutableList<Object> toStringOtherFields() {
- return super.toStringOtherFields().appendIfNotNull(description);
- }
-
- @Override
- protected MutableList<Object> equalsFields() {
- return super.equalsFields().appendIfNotNull(period);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
deleted file mode 100644
index 175c76f..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/PollHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-/**
- * Notified by the Poller of the result for each job, on each poll.
- *
- * @author aled
- */
-public interface PollHandler<V> {
-
- public boolean checkSuccess(V val);
-
- public void onSuccess(V val);
-
- public void onFailure(V val);
-
- public void onException(Exception exception);
-
- public String getDescription();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
deleted file mode 100644
index f6e8e24..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/Poller.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
-import org.apache.brooklyn.util.core.task.ScheduledTask;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-
-/**
- * For executing periodic polls.
- * Jobs are added to the schedule, and then the poller is started.
- * The jobs will then be executed periodically, and the handler called for the result/failure.
- *
- * Assumes the schedule+start will be done single threaded, and that stop will not be done concurrently.
- */
-public class Poller<V> {
- public static final Logger log = LoggerFactory.getLogger(Poller.class);
-
- private final EntityLocal entity;
- private final boolean onlyIfServiceUp;
- private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>();
- private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
- private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
- private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
- private volatile boolean started = false;
-
- private static class PollJob<V> {
- final PollHandler<? super V> handler;
- final Duration pollPeriod;
- final Runnable wrappedJob;
- private boolean loggedPreviousException = false;
-
- PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) {
- this.handler = handler;
- this.pollPeriod = period;
-
- wrappedJob = new Runnable() {
- public void run() {
- try {
- V val = job.call();
- loggedPreviousException = false;
- if (handler.checkSuccess(val)) {
- handler.onSuccess(val);
- } else {
- handler.onFailure(val);
- }
- } catch (Exception e) {
- if (loggedPreviousException) {
- if (log.isTraceEnabled()) log.trace("PollJob for {}, repeated consecutive failures, handling {} using {}", new Object[] {job, e, handler});
- } else {
- if (log.isDebugEnabled()) log.debug("PollJob for {} handling {} using {}", new Object[] {job, e, handler});
- loggedPreviousException = true;
- }
- handler.onException(e);
- }
- }
- };
- }
- }
-
- /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */
- @Deprecated
- public Poller(EntityLocal entity) {
- this(entity, false);
- }
- public Poller(EntityLocal entity, boolean onlyIfServiceUp) {
- this.entity = entity;
- this.onlyIfServiceUp = onlyIfServiceUp;
- }
-
- /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */
- public void submit(Callable<?> job) {
- if (started) {
- throw new IllegalStateException("Cannot submit additional tasks after poller has started");
- }
- oneOffJobs.add(job);
- }
-
- public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, long period) {
- scheduleAtFixedRate(job, handler, Duration.millis(period));
- }
- public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
- if (started) {
- throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
- }
- PollJob<V> foo = new PollJob<V>(job, handler, period);
- pollJobs.add(foo);
- }
-
- @SuppressWarnings({ "unchecked" })
- public void start() {
- // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore
- // Is that ok, are can we do better?
-
- if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
- if (started) {
- throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running",
- this, entity));
- }
-
- started = true;
-
- for (final Callable<?> oneOffJob : oneOffJobs) {
- Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build();
- oneOffTasks.add(((EntityInternal)entity).getExecutionContext().submit(task));
- }
-
- for (final PollJob<V> pollJob : pollJobs) {
- final String scheduleName = pollJob.handler.getDescription();
- if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
- Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() {
- public Task<?> call() {
- DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity),
- new Callable<Void>() { public Void call() {
- if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
- return null;
- }
- pollJob.wrappedJob.run();
- return null;
- } } );
- BrooklynTaskTags.setTransient(task);
- return task;
- }
- };
- ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollJob.pollPeriod, "displayName", "scheduled:"+scheduleName), pollingTaskFactory);
- tasks.add((ScheduledTask)Entities.submit(entity, task));
- } else {
- if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
- }
- }
- }
-
- public void stop() {
- if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this});
- if (!started) {
- throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running",
- this, entity));
- }
-
- started = false;
- for (Task<?> task : oneOffTasks) {
- if (task != null) task.cancel(true);
- }
- for (ScheduledTask task : tasks) {
- if (task != null) task.cancel();
- }
- oneOffTasks.clear();
- tasks.clear();
- }
-
- public boolean isRunning() {
- boolean hasActiveTasks = false;
- for (Task<?> task: tasks) {
- if (task.isBegun() && !task.isDone()) {
- hasActiveTasks = true;
- break;
- }
- }
- if (!started && hasActiveTasks) {
- log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
- }
- return started && hasActiveTasks;
- }
-
- protected boolean isEmpty() {
- return pollJobs.isEmpty();
- }
-
- public String toString() {
- return Objects.toStringHelper(this).add("entity", entity).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
deleted file mode 100644
index 1cb6861..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionFeed.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.function;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.AttributePollHandler;
-import org.apache.brooklyn.sensor.feed.DelegatingPollHandler;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-/**
- * Provides a feed of attribute values, by periodically invoking functions.
- *
- * Example usage (e.g. in an entity that extends SoftwareProcessImpl):
- * <pre>
- * {@code
- * private FunctionFeed feed;
- *
- * //@Override
- * protected void connectSensors() {
- * super.connectSensors();
- *
- * feed = FunctionFeed.builder()
- * .entity(this)
- * .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
- * .period(500, TimeUnit.MILLISECONDS)
- * .callable(new Callable<Boolean>() {
- * public Boolean call() throws Exception {
- * return getDriver().isRunning();
- * }
- * })
- * .onExceptionOrFailure(Functions.constant(Boolan.FALSE))
- * .build();
- * }
- *
- * {@literal @}Override
- * protected void disconnectSensors() {
- * super.disconnectSensors();
- * if (feed != null) feed.stop();
- * }
- * }
- * </pre>
- *
- * @author aled
- */
-public class FunctionFeed extends AbstractFeed {
-
- private static final Logger log = LoggerFactory.getLogger(FunctionFeed.class);
-
- // Treat as immutable once built
- @SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>> POLLS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>>>() {},
- "polls");
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static Builder builder(String uniqueTag) {
- return new Builder().uniqueTag(uniqueTag);
- }
-
- public static class Builder {
- private EntityLocal entity;
- private boolean onlyIfServiceUp = false;
- private long period = 500;
- private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
- private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
- private String uniqueTag;
- private volatile boolean built;
-
- public Builder entity(EntityLocal val) {
- this.entity = val;
- return this;
- }
- public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); }
- public Builder onlyIfServiceUp(boolean onlyIfServiceUp) {
- this.onlyIfServiceUp = onlyIfServiceUp;
- return this;
- }
- public Builder period(Duration d) {
- return period(d.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- public Builder period(long millis) {
- return period(millis, TimeUnit.MILLISECONDS);
- }
- public Builder period(long val, TimeUnit units) {
- this.period = val;
- this.periodUnits = units;
- return this;
- }
- public Builder poll(FunctionPollConfig<?,?> config) {
- polls.add(config);
- return this;
- }
- public Builder uniqueTag(String uniqueTag) {
- this.uniqueTag = uniqueTag;
- return this;
- }
- public FunctionFeed build() {
- built = true;
- FunctionFeed result = new FunctionFeed(this);
- result.setEntity(checkNotNull(entity, "entity"));
- result.start();
- return result;
- }
- @Override
- protected void finalize() {
- if (!built) log.warn("FunctionFeed.Builder created, but build() never called");
- }
- }
-
- private static class FunctionPollIdentifier {
- final Callable<?> job;
-
- private FunctionPollIdentifier(Callable<?> job) {
- this.job = checkNotNull(job, "job");
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(job);
- }
-
- @Override
- public boolean equals(Object other) {
- return (other instanceof FunctionPollIdentifier) && Objects.equal(job, ((FunctionPollIdentifier)other).job);
- }
- }
-
- /**
- * For rebind; do not call directly; use builder
- */
- public FunctionFeed() {
- }
-
- protected FunctionFeed(Builder builder) {
- setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp);
-
- SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create();
- for (FunctionPollConfig<?,?> config : builder.polls) {
- if (!config.isEnabled()) continue;
- @SuppressWarnings({ "rawtypes", "unchecked" })
- FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config);
- if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
- Callable<?> job = config.getCallable();
- polls.put(new FunctionPollIdentifier(job), configCopy);
- }
- setConfig(POLLS, polls);
- initUniqueTag(builder.uniqueTag, polls.values());
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- protected void preStart() {
- SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?, ?>> polls = getConfig(POLLS);
- for (final FunctionPollIdentifier pollInfo : polls.keySet()) {
- Set<FunctionPollConfig<?,?>> configs = polls.get(pollInfo);
- long minPeriod = Integer.MAX_VALUE;
- Set<AttributePollHandler<?>> handlers = Sets.newLinkedHashSet();
-
- for (FunctionPollConfig<?,?> config : configs) {
- handlers.add(new AttributePollHandler(config, entity, this));
- if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
- }
-
- getPoller().scheduleAtFixedRate(
- (Callable)pollInfo.job,
- new DelegatingPollHandler(handlers),
- minPeriod);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
deleted file mode 100644
index 7b91988..0000000
--- a/core/src/main/java/org/apache/brooklyn/sensor/feed/function/FunctionPollConfig.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.sensor.feed.function;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import groovy.lang.Closure;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.FeedConfig;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
-import org.apache.brooklyn.util.guava.Functionals;
-import org.apache.brooklyn.util.javalang.JavaClassNames;
-
-import com.google.common.base.Supplier;
-
-public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> {
-
- private Callable<?> callable;
-
- public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) {
- return new FunctionPollConfig<Object, T>(sensor);
- }
-
- public FunctionPollConfig(AttributeSensor<T> sensor) {
- super(sensor);
- }
-
- public FunctionPollConfig(FunctionPollConfig<S, T> other) {
- super(other);
- callable = other.callable;
- }
-
- public Callable<? extends Object> getCallable() {
- return callable;
- }
-
- /**
- * The {@link Callable} to be invoked on each poll.
- * <p>
- * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
- * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
- * return the wrong type.
- */
- @SuppressWarnings("unchecked")
- public <newS> FunctionPollConfig<newS, T> callable(Callable<? extends newS> val) {
- this.callable = checkNotNull(val, "callable");
- return (FunctionPollConfig<newS, T>) this;
- }
-
- /**
- * Supplies the value to be returned by each poll.
- * <p>
- * Note this <em>must</em> use generics, otherwise the return type of subsequent chained
- * calls will (e.g. to {@link FeedConfig#onException(com.google.common.base.Function)} will
- * return the wrong type.
- */
- @SuppressWarnings("unchecked")
- public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) {
- this.callable = Functionals.callable( checkNotNull(val, "supplier") );
- return (FunctionPollConfig<newS, T>) this;
- }
-
- /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */
- @SuppressWarnings({ "unchecked", "unused" })
- private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) {
- checkNotNull(val, "supplier");
- this.callable = new Callable<newS>() {
- @Override
- public newS call() throws Exception {
- return val.get();
- }
- };
- return (FunctionPollConfig<newS, T>) this;
- }
-
- public FunctionPollConfig<S, T> closure(Closure<?> val) {
- this.callable = GroovyJavaMethods.callableFromClosure(checkNotNull(val, "closure"));
- return this;
- }
-
- @Override protected String toStringBaseName() { return "fn"; }
- @Override protected String toStringPollSource() {
- if (callable==null) return null;
- String cs = callable.toString();
- if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
- return cs;
- }
- // if hashcode is in callable it's probably a custom internal; return class name
- return JavaClassNames.simpleClassName(callable);
- }
-
-}