You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by dr...@apache.org on 2018/02/05 10:57:48 UTC

[2/3] brooklyn-server git commit: Sensor feeds: avoid repeated log.warn on failure

Sensor feeds: avoid repeated log.warn on failure


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/a088f468
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/a088f468
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/a088f468

Branch: refs/heads/master
Commit: a088f468d6968de2ab8352019c64e654f42782ba
Parents: f6a2ed1
Author: Aled Sage <al...@gmail.com>
Authored: Mon Jan 29 16:43:27 2018 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Mon Feb 5 09:43:16 2018 +0000

----------------------------------------------------------------------
 .../camp/brooklyn/FunctionSensorYamlTest.java   | 82 ++++++++++++++++++++
 .../core/feed/AttributePollHandler.java         | 32 +++-----
 .../apache/brooklyn/core/feed/FeedConfig.java   | 42 +++++++++-
 .../core/sensor/AbstractAddSensorFeed.java      | 56 +++++++++++++
 .../core/sensor/function/FunctionSensor.java    | 17 ++--
 .../core/sensor/http/HttpRequestSensor.java     | 17 ++--
 .../core/sensor/ssh/SshCommandSensor.java       | 16 ++--
 .../entity/java/JmxAttributeSensor.java         | 16 ++--
 .../core/sensor/windows/WinRmCommandSensor.java | 16 ++--
 .../org/apache/brooklyn/test/LogWatcher.java    | 61 ++++++++++++++-
 10 files changed, 290 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/FunctionSensorYamlTest.java
----------------------------------------------------------------------
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/FunctionSensorYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/FunctionSensorYamlTest.java
index ba6e669..33102f0 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/FunctionSensorYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/FunctionSensorYamlTest.java
@@ -18,22 +18,41 @@
  */
 package org.apache.brooklyn.camp.brooklyn;
 
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.api.entity.Application;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.feed.AttributePollHandler;
+import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.sensor.function.FunctionSensor;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.stock.BasicApplication;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.test.LogWatcher;
+import org.apache.brooklyn.test.LogWatcher.EventPredicates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+
 public class FunctionSensorYamlTest extends AbstractYamlRebindTest {
     private static final Logger log = LoggerFactory.getLogger(FunctionSensorYamlTest.class);
 
@@ -42,12 +61,25 @@ public class FunctionSensorYamlTest extends AbstractYamlRebindTest {
 
     public static class MyCallable implements Callable<Object> {
         public static AtomicReference<Object> val = new AtomicReference<>();
+        public static AtomicInteger callCounter = new AtomicInteger();
 
+        public static void clear() {
+            callCounter.set(0);
+            val.set(null);
+        }
         @Override public Object call() throws Exception {
+            callCounter.incrementAndGet();
             return val.get();
         }
     }
 
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        MyCallable.clear();
+    }
+
     @Test
     public void testFunctionSensor() throws Exception {
         MyCallable.val.set("first");
@@ -116,6 +148,56 @@ public class FunctionSensorYamlTest extends AbstractYamlRebindTest {
         EntityAsserts.assertAttributeEqualsEventually(newEntity, SENSOR_INT, 3);
     }
 
+    @Test
+    public void testWarnOnlyOnceOnRepeatedCoercionException() throws Exception {
+        MyCallable.val.set("my-not-a-number");
+        
+        List<String> loggerNames = ImmutableList.of(
+                AttributePollHandler.class.getName(), 
+                Poller.class.getName());
+        ch.qos.logback.classic.Level logLevel = ch.qos.logback.classic.Level.TRACE;
+        Predicate<ILoggingEvent> filter = Predicates.alwaysTrue();
+        LogWatcher watcher = new LogWatcher(loggerNames, logLevel, filter);
+
+        watcher.start();
+        try {
+            Entity app = createAndStartApplication(
+                    "services:",
+                    "- type: " + TestEntity.class.getName(),
+                    "  brooklyn.config:",
+                    "    onbox.base.dir.skipResolution: true",
+                    "  brooklyn.initializers:",
+                    "  - type: "+FunctionSensor.class.getName(),
+                    "    brooklyn.config:",
+                    "      "+FunctionSensor.SENSOR_PERIOD.getName()+": 1ms",
+                    "      "+FunctionSensor.SENSOR_NAME.getName()+": mysensor",
+                    "      "+FunctionSensor.SENSOR_TYPE.getName()+": int",
+                    "      "+FunctionSensor.LOG_WARNING_GRACE_TIME_ON_STARTUP.getName()+": 0s",
+                    "      "+FunctionSensor.SENSOR_TYPE.getName()+": int",
+                    "      "+FunctionSensor.FUNCTION.getName()+":",
+                    "        $brooklyn:object:",
+                    "          type: "+MyCallable.class.getName());
+            waitForApplicationTasks(app);
+
+            // Wait until we've polled (and thus presumably tried to handle the response) 3 times, 
+            // then shutdown the app so we don't risk flooding the log too much if it's going wrong!
+            Asserts.succeedsEventually(() -> assertTrue(MyCallable.callCounter.get() > 3));
+            ((BasicApplication)app).stop();
+            
+            // Ensure we log.warn only once
+            Iterable<ILoggingEvent> warnEvents = Iterables.filter(watcher.getEvents(), EventPredicates.levelGeaterOrEqual(Level.WARN));
+            assertTrue(Iterables.tryFind(warnEvents, EventPredicates.containsMessages("Read of", "gave exception", "Cannot coerce ")).isPresent(), "warnEvents="+warnEvents);
+            assertEquals(Iterables.size(warnEvents), 1, "warnEvents="+warnEvents);
+
+            // Ensure we log the stacktrace only once
+            Iterable<ILoggingEvent> exceptionEvents = Iterables.filter(watcher.getEvents(), EventPredicates.containsException());
+            assertTrue(Iterables.tryFind(exceptionEvents, EventPredicates.containsExceptionMessage("Cannot coerce ")).isPresent(), "exceptionEvents="+exceptionEvents);
+            assertEquals(Iterables.size(exceptionEvents), 1, "exceptionEvents="+exceptionEvents);
+        } finally {
+            watcher.close();
+        }
+    }
+
     @Override
     protected Logger getLogger() {
         return log;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
index 903b6e0..0377eed 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -55,12 +55,6 @@ public class AttributePollHandler<V> implements PollHandler<V> {
     private final AbstractFeed feed;
     private final boolean suppressDuplicates;
     
-    // allow 30 seconds before logging at WARN, if there has been no success yet;
-    // after success WARN immediately
-    // TODO these should both be configurable
-    private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
-    private Duration logWarningGraceTime = Duration.millis(0);
-    
     // internal state to look after whether to log warnings
     private volatile Long lastSuccessTime = null;
     private volatile Long currentProblemStartTime = null;
@@ -84,6 +78,9 @@ public class AttributePollHandler<V> implements PollHandler<V> {
 
     @Override
     public void onSuccess(V val) {
+        if (log.isTraceEnabled()) log.trace("poll for "+getBriefDescription()+" got: "+val);
+        setSensor(transformValueOnSuccess(val));
+        
         if (lastWasProblem) {
             if (currentProblemLoggedAsWarning) { 
                 log.info("Success (following previous problem) reading "+getBriefDescription());
@@ -95,17 +92,6 @@ public class AttributePollHandler<V> implements PollHandler<V> {
             currentProblemLoggedAsWarning = false;
         }
         lastSuccessTime = System.currentTimeMillis();
-        if (log.isTraceEnabled()) log.trace("poll for {} got: {}", new Object[] {getBriefDescription(), val});
-        
-        try {
-            setSensor(transformValueOnSuccess(val));
-        } catch (Exception e) {
-            if (feed.isConnected()) {
-                log.warn("unable to compute "+getBriefDescription()+"; on val="+val, e);
-            } else {
-                if (log.isDebugEnabled()) log.debug("unable to compute "+getBriefDescription()+"; val="+val+" (when inactive)", e);
-            }
-        }
     }
 
     /** allows post-processing, such as applying a success handler; 
@@ -137,7 +123,7 @@ public class AttributePollHandler<V> implements PollHandler<V> {
     @Override
     public void onException(Exception exception) {
         if (!feed.isConnected()) {
-            if (log.isTraceEnabled()) log.trace("Read of {} in {} gave exception (while not connected or not yet connected): {}", new Object[] {this, getBriefDescription(), exception});
+            if (log.isTraceEnabled()) log.trace("Read of "+this+" in "+getBriefDescription()+" gave exception (while not connected or not yet connected): "+ exception);
         } else {
             logProblem("exception", exception);
         }
@@ -158,15 +144,17 @@ public class AttributePollHandler<V> implements PollHandler<V> {
     protected void logProblem(String type, Object val) {
         if (lastWasProblem && currentProblemLoggedAsWarning) {
             if (log.isTraceEnabled())
-                log.trace("Recurring {} reading {} in {}: {}", new Object[] {type, this, getBriefDescription(), val});
+                log.trace("Recurring "+type+" reading "+this+" in "+getBriefDescription()+": "+val);
         } else {
             long nowTime = System.currentTimeMillis();
             // get a non-volatile value
             Long currentProblemStartTimeCache = currentProblemStartTime;
             long expiryTime = 
-                    (lastSuccessTime!=null && !isTransitioningOrStopped()) ? lastSuccessTime+logWarningGraceTime.toMilliseconds() :
-                    currentProblemStartTimeCache!=null ? currentProblemStartTimeCache+logWarningGraceTimeOnStartup.toMilliseconds() :
-                    nowTime+logWarningGraceTimeOnStartup.toMilliseconds();
+                    (lastSuccessTime!=null && !isTransitioningOrStopped()) 
+                            ? lastSuccessTime+config.getLogWarningGraceTime().toMilliseconds() 
+                            : (currentProblemStartTimeCache != null) 
+                                    ? currentProblemStartTimeCache+config.getLogWarningGraceTimeOnStartup().toMilliseconds() 
+                                    : nowTime+config.getLogWarningGraceTimeOnStartup().toMilliseconds();
             if (!lastWasProblem) {
                 if (expiryTime <= nowTime) {
                     currentProblemLoggedAsWarning = true;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
index 91f6f7c..7e7a5b0 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/FeedConfig.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.guava.Functionals;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
@@ -63,6 +64,26 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
     private boolean suppressDuplicates;
     private boolean enabled = true;
     
+    // allow 30 seconds before logging at WARN, if there has been no success yet;
+    // after success WARN immediately
+    // TODO these should both be configurable
+    /**
+     * On startup, the length of time before which a failure can be logged at WARN.
+     * This grace period is useful to avoid flooding the logs if the feed is expected
+     * to sometimes be unavailable for a few seconds while the process-under-management
+     * initialises.
+     * 
+     * Defaults to 30 seconds.
+     */
+    private Duration logWarningGraceTimeOnStartup = Duration.THIRTY_SECONDS;
+
+    /**
+     * Length of time, after a successful poll, before a subsequent failure can be logged at WARN.
+     * 
+     * Defaults to 0.
+     */
+    private Duration logWarningGraceTime = Duration.millis(0);
+
     public FeedConfig(AttributeSensor<T> sensor) {
         this.sensor = checkNotNull(sensor, "sensor");
     }
@@ -74,6 +95,8 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
         this.onexception = other.onexception;
         this.checkSuccess = other.checkSuccess;
         this.suppressDuplicates = other.suppressDuplicates;
+        this.logWarningGraceTimeOnStartup = other.logWarningGraceTimeOnStartup;
+        this.logWarningGraceTime = other.logWarningGraceTime;
         this.enabled = other.enabled;
     }
 
@@ -201,6 +224,24 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
         return self();
     }
 
+    public F logWarningGraceTimeOnStartup(Duration val) {
+        this.logWarningGraceTimeOnStartup = checkNotNull(val);
+        return self();
+    }
+
+    public F logWarningGraceTime(Duration val) {
+        this.logWarningGraceTime = checkNotNull(val);
+        return self();
+    }
+    
+    public Duration getLogWarningGraceTimeOnStartup() {
+        return logWarningGraceTimeOnStartup;
+    }
+
+    public Duration getLogWarningGraceTime() {
+        return logWarningGraceTime;
+    }
+    
     /**
      * Whether this feed is enabled (defaulting to true).
      */
@@ -300,5 +341,4 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
         if (!Objects.equal(equalsFields(), other.equalsFields())) return false;
         return true;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
new file mode 100644
index 0000000..5ff73ce
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.sensor;
+
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.AddSensor;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Super-class for entity initializers that add feeds.
+ */
+@Beta
+public abstract class AbstractAddSensorFeed<T> extends AddSensor<T> {
+
+    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
+            "suppressDuplicates", 
+            "Whether to publish the sensor value again, if it is the same as the previous value",
+            Boolean.FALSE);
+    
+    public static final ConfigKey<Duration> LOG_WARNING_GRACE_TIME_ON_STARTUP = ConfigKeys.newDurationConfigKey(
+            "logWarningGraceTimeOnStartup",
+            "On startup, the length of time before which a failure can be logged at WARN. "
+                    + "This grace period is useful to avoid flooding the logs if the feed is expected " 
+                    + "to sometimes be unavailable for a few seconds while the process-under-management" 
+                    + "initialises.",
+            Duration.millis(0));
+
+    public static final ConfigKey<Duration> LOG_WARNING_GRACE_TIME = ConfigKeys.newDurationConfigKey(
+            "logWarningGraceTime",
+            "Length of time, after a successful poll, before a subsequent failure can be logged at WARN.",
+            Duration.millis(0));
+
+    public AbstractAddSensorFeed(final ConfigBag params) {
+        super(params);
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/sensor/function/FunctionSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/function/FunctionSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/function/FunctionSensor.java
index 4b32451..62e785c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/function/FunctionSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/function/FunctionSensor.java
@@ -23,11 +23,12 @@ import java.util.concurrent.Callable;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
 import org.apache.brooklyn.feed.function.FunctionFeed;
 import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,21 +43,17 @@ import com.google.common.reflect.TypeToken;
  * @see FunctionFeed
  */
 @Beta
-public final class FunctionSensor<T> extends AddSensor<T> {
+public final class FunctionSensor<T> extends AbstractAddSensorFeed<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(FunctionSensor.class);
 
-    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
-            "suppressDuplicates", 
-            "Whether to publish the sensor value again, if it is the same as the previous value",
-            Boolean.FALSE);
-    
+    @SuppressWarnings("serial")
     public static final ConfigKey<Callable<?>> FUNCTION = ConfigKeys.newConfigKey(
             new TypeToken<Callable<?>>() {},
             "function",
             "The callable to be executed periodically",
             null);
-    
+
     public FunctionSensor(final ConfigBag params) {
         super(params);
     }
@@ -73,11 +70,15 @@ public final class FunctionSensor<T> extends AddSensor<T> {
         
         final Callable<?> function = EntityInitializers.resolve(allConfig, FUNCTION);
         final Boolean suppressDuplicates = EntityInitializers.resolve(allConfig, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(allConfig, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(allConfig, LOG_WARNING_GRACE_TIME);
 
         FunctionPollConfig<?, T> pollConfig = new FunctionPollConfig<Object, T>(sensor)
                 .callable(function)
                 .onFailureOrException(Functions.constant((T) null))
                 .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime)
                 .period(period);
 
         FunctionFeed feed = FunctionFeed.builder().entity(entity)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
index 842bcb4..a6c73e4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
@@ -25,17 +25,16 @@ import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
-import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.EntityInitializers;
-import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
 import org.apache.brooklyn.feed.http.HttpFeed;
 import org.apache.brooklyn.feed.http.HttpPollConfig;
 import org.apache.brooklyn.feed.http.HttpValueFunctions;
 import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.config.ResolvingConfigBag;
 import org.apache.brooklyn.util.http.HttpToolResponse;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +52,7 @@ import net.minidev.json.JSONObject;
  * @see SshCommandSensor
  */
 @Beta
-public final class HttpRequestSensor<T> extends AddSensor<T> {
+public final class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HttpRequestSensor.class);
 
@@ -62,16 +61,12 @@ public final class HttpRequestSensor<T> extends AddSensor<T> {
     public static final ConfigKey<String> USERNAME = ConfigKeys.newStringConfigKey("username", "Username for HTTP request, if required");
     public static final ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "Password for HTTP request, if required");
     public static final ConfigKey<Map<String, String>> HEADERS = new MapConfigKey<>(String.class, "headers");
-    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
-            "suppressDuplicates", 
-            "Whether to publish the sensor value again, if it is the same as the previous value",
-            Boolean.FALSE);
     
     public static final ConfigKey<Boolean> PREEMPTIVE_BASIC_AUTH = ConfigKeys.newBooleanConfigKey(
             "preemptiveBasicAuth",
             "Whether to pre-emptively including a basic-auth header of the username:password (rather than waiting for a challenge)",
             Boolean.FALSE);
-    
+
     public HttpRequestSensor(final ConfigBag params) {
         super(params);
     }
@@ -101,6 +96,8 @@ public final class HttpRequestSensor<T> extends AddSensor<T> {
         final Map<String, String> headers = EntityInitializers.resolve(allConfig, HEADERS);
         final Boolean preemptiveBasicAuth = EntityInitializers.resolve(allConfig, PREEMPTIVE_BASIC_AUTH);
         final Boolean suppressDuplicates = EntityInitializers.resolve(allConfig, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(allConfig, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(allConfig, LOG_WARNING_GRACE_TIME);
         
         Function<? super HttpToolResponse, T> successFunction;
         if (Strings.isBlank(jsonPath)) {
@@ -115,6 +112,8 @@ public final class HttpRequestSensor<T> extends AddSensor<T> {
                 .onFailureOrException(Functions.constant((T) null))
                 .onSuccess(successFunction)
                 .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime)
                 .period(period);
 
         HttpFeed.Builder httpRequestBuilder = HttpFeed.builder().entity(entity)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index 2fd3f3f..84b7322 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -27,10 +27,10 @@ import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
-import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshFeed;
@@ -43,6 +43,7 @@ import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.os.Os;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +62,7 @@ import com.google.common.collect.ImmutableMap;
  * @see HttpRequestSensor
  */
 @Beta
-public final class SshCommandSensor<T> extends AddSensor<T> {
+public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SshCommandSensor.class);
 
@@ -71,11 +72,6 @@ public final class SshCommandSensor<T> extends AddSensor<T> {
         + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir");
     public static final MapConfigKey<Object> SENSOR_SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT;
 
-    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
-            "suppressDuplicates", 
-            "Whether to publish the sensor value again, if it is the same as the previous value",
-            Boolean.FALSE);
-
     protected final String command;
     protected final String executionDir;
     protected final Map<String,Object> sensorEnv;
@@ -98,6 +94,8 @@ public final class SshCommandSensor<T> extends AddSensor<T> {
         }
 
         final Boolean suppressDuplicates = EntityInitializers.resolve(params, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME);
 
         Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() {
             @Override
@@ -143,7 +141,9 @@ public final class SshCommandSensor<T> extends AddSensor<T> {
                         @Override
                         public T apply(String input) {
                             return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType());
-                        }}, SshValueFunctions.stdout()));
+                        }}, SshValueFunctions.stdout()))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime);
 
         SshFeed feed = SshFeed.builder()
                 .entity(entity)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/software/base/src/main/java/org/apache/brooklyn/entity/java/JmxAttributeSensor.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/java/JmxAttributeSensor.java b/software/base/src/main/java/org/apache/brooklyn/entity/java/JmxAttributeSensor.java
index 21046ad..61f753e 100644
--- a/software/base/src/main/java/org/apache/brooklyn/entity/java/JmxAttributeSensor.java
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/java/JmxAttributeSensor.java
@@ -27,8 +27,8 @@ import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
 import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
@@ -38,6 +38,7 @@ import org.apache.brooklyn.feed.jmx.JmxHelper;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ import com.google.common.base.Preconditions;
  * @see HttpRequestSensor
  */
 @Beta
-public final class JmxAttributeSensor<T> extends AddSensor<T> {
+public final class JmxAttributeSensor<T> extends AbstractAddSensorFeed<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JmxAttributeSensor.class);
 
@@ -61,11 +62,6 @@ public final class JmxAttributeSensor<T> extends AddSensor<T> {
     public static final ConfigKey<String> ATTRIBUTE = ConfigKeys.newStringConfigKey("attribute", "JMX attribute to poll in object");
     public static final ConfigKey<Object> DEFAULT_VALUE = ConfigKeys.newConfigKey(Object.class, "defaultValue", "Default value for sensor; normally null");
 
-    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
-            "suppressDuplicates", 
-            "Whether to publish the sensor value again, if it is the same as the previous value",
-            Boolean.FALSE);
-
     protected final String objectName;
     protected final String attribute;
     protected final Object defaultValue;
@@ -89,6 +85,8 @@ public final class JmxAttributeSensor<T> extends AddSensor<T> {
         super.apply(entity);
 
         final Boolean suppressDuplicates = EntityInitializers.resolve(params, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME);
 
         if (entity instanceof UsesJmx) {
             if (LOG.isDebugEnabled()) {
@@ -111,7 +109,9 @@ public final class JmxAttributeSensor<T> extends AddSensor<T> {
                                             .objectName(objectName)
                                             .attributeName(attribute)
                                             .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
-                                            .onFailureOrException(Functions.<T>constant((T) defaultValue)))
+                                            .onFailureOrException(Functions.<T>constant((T) defaultValue))
+                                            .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                                            .logWarningGraceTime(logWarningGraceTime))
                                     .build();
                             entity.addFeed(feed);
                             return feed;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
----------------------------------------------------------------------
diff --git a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
index 2fcc315..3ee5e24 100644
--- a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
+++ b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
@@ -26,9 +26,9 @@ import org.apache.brooklyn.api.entity.EntityInitializer;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
+import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshValueFunctions;
@@ -41,6 +41,7 @@ import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +61,7 @@ import com.google.common.base.Supplier;
  * @see HttpRequestSensor
  */
 @Beta
-public final class WinRmCommandSensor<T> extends AddSensor<T> {
+public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> {
 
     private static final Logger LOG = LoggerFactory.getLogger(WinRmCommandSensor.class);
 
@@ -70,11 +71,6 @@ public final class WinRmCommandSensor<T> extends AddSensor<T> {
         + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir");
     public static final ConfigKey<Map<String, String>> SENSOR_ENVIRONMENT = WinRmTool.ENVIRONMENT;
 
-    public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey(
-            "suppressDuplicates", 
-            "Whether to publish the sensor value again, if it is the same as the previous value",
-            Boolean.FALSE);
-
     protected final String command;
     protected final String executionDir;
     protected final Map<String,String> sensorEnv;
@@ -97,6 +93,8 @@ public final class WinRmCommandSensor<T> extends AddSensor<T> {
         }
 
         final Boolean suppressDuplicates = EntityInitializers.resolve(params, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME);
 
         Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() {
             @Override
@@ -137,7 +135,9 @@ public final class WinRmCommandSensor<T> extends AddSensor<T> {
                         @Override
                         public T apply(String input) {
                             return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType());
-                        }}, SshValueFunctions.stdout()));
+                        }}, SshValueFunctions.stdout()))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime);
 
         CmdFeed feed = CmdFeed.builder()
                 .entity(entity)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/a088f468/test-support/src/main/java/org/apache/brooklyn/test/LogWatcher.java
----------------------------------------------------------------------
diff --git a/test-support/src/main/java/org/apache/brooklyn/test/LogWatcher.java b/test-support/src/main/java/org/apache/brooklyn/test/LogWatcher.java
index 0f630bb..15fd245 100644
--- a/test-support/src/main/java/org/apache/brooklyn/test/LogWatcher.java
+++ b/test-support/src/main/java/org/apache/brooklyn/test/LogWatcher.java
@@ -23,12 +23,14 @@ import static com.google.common.base.Preconditions.checkState;
 import static org.testng.Assert.assertFalse;
 
 import java.io.Closeable;
+import java.io.PrintStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.brooklyn.util.time.Time;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -78,6 +80,7 @@ public class LogWatcher implements Closeable {
                 }
             };
         }
+        
         public static Predicate<ILoggingEvent> containsExceptionStackLine(final Class<?> clazz, final String methodName) {
             return new Predicate<ILoggingEvent>() {
                 @Override public boolean apply(ILoggingEvent input) {
@@ -94,6 +97,42 @@ public class LogWatcher implements Closeable {
                 }
             };
         }
+
+        public static Predicate<ILoggingEvent> containsException() {
+            return new Predicate<ILoggingEvent>() {
+                @Override public boolean apply(ILoggingEvent input) {
+                    return (input != null) && (input.getThrowableProxy() != null);
+                }
+            };
+        }
+
+        public static Predicate<ILoggingEvent> containsExceptionMessage(final String expected) {
+            return containsExceptionMessages(expected);
+        }
+
+        public static Predicate<ILoggingEvent> containsExceptionMessages(final String... expecteds) {
+            return new Predicate<ILoggingEvent>() {
+                @Override public boolean apply(ILoggingEvent input) {
+                    IThrowableProxy throwable = (input != null) ? input.getThrowableProxy() : null;
+                    String msg = (throwable != null) ? throwable.getMessage() : null;
+                    if (msg == null) return false;
+                    for (String expected : expecteds) {
+                        if (!msg.contains(expected)) return false;
+                    }
+                    return true;
+                }
+            };
+        }
+        
+        public static Predicate<ILoggingEvent> levelGeaterOrEqual(final Level expectedLevel) {
+            return new Predicate<ILoggingEvent>() {
+                @Override public boolean apply(ILoggingEvent input) {
+                    if (input == null) return false;
+                    Level level = input.getLevel();
+                    return level.isGreaterOrEqual(expectedLevel);
+                }
+            };
+        }
     }
     
     private final List<ILoggingEvent> events = Collections.synchronizedList(Lists.<ILoggingEvent>newLinkedList());
@@ -191,13 +230,33 @@ public class LogWatcher implements Closeable {
             return ImmutableList.copyOf(events);
         }
     }
-    
+
     public List<ILoggingEvent> getEvents(Predicate<? super ILoggingEvent> filter) {
         synchronized (events) {
             return ImmutableList.copyOf(Iterables.filter(events, filter));
         }
     }
+
+    public void printEvents() {
+        printEvents(System.out, getEvents());
+    }
     
+    public void printEvents(PrintStream stream, Iterable<? extends ILoggingEvent> events) {
+        for (ILoggingEvent event : events) {
+            stream.println(Time.makeDateString(event.getTimeStamp()) + ": " + event.getThreadName() 
+                    + ": " + event.getLevel() + ": " + event.getMessage());
+            IThrowableProxy throwable = event.getThrowableProxy();
+            if (throwable != null) {
+                stream.println("\t" + throwable.getMessage());
+                if (throwable.getStackTraceElementProxyArray() != null) {
+                    for (StackTraceElementProxy element : throwable.getStackTraceElementProxyArray()) {
+                        stream.println("\t\t" + "at " + element);
+                    }
+                }
+            }
+        }
+    }
+
     public void clearEvents() {
         synchronized (events) {
             events.clear();