You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/04/13 02:51:21 UTC

[1/4] incubator-brooklyn git commit: new policies: SshConnectionFailure, ConditionalSuspendPolicy

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 6153b0a8f -> 04fc801d0


new policies: SshConnectionFailure, ConditionalSuspendPolicy

  * SshConnectionFailure emits CONNECTION_FAILURE if it can't make ssh connection to the machine of the entity
  * ConditionalSuspendPolicy suspends a target policy if it receives a sensor event (CONNECTION_FAILURE by default)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd0c2348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd0c2348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd0c2348

Branch: refs/heads/master
Commit: dd0c2348a2677526f29866b72aaa8cbe351c503a
Parents: 0ff4216
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Tue Mar 24 15:17:10 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 12:13:40 2015 +0300

----------------------------------------------------------------------
 .../policy/ha/AbstractFailureDetector.java      | 359 +++++++++++++++++++
 .../policy/ha/ConditionalSuspendPolicy.java     | 101 ++++++
 .../policy/ha/ConnectionFailureDetector.java    | 290 ++-------------
 .../policy/ha/SshMachineFailureDetector.java    |  99 +++++
 4 files changed, 597 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
new file mode 100644
index 0000000..6a1324c
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import static brooklyn.util.time.Time.makeTimeStringRounded;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.management.Task;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.task.BasicTask;
+import brooklyn.util.task.ScheduledTask;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.reflect.TypeToken;
+
+public abstract class AbstractFailureDetector extends AbstractPolicy {
+
+    // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays.
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class);
+
+    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+
+    public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey(
+            "failureDetector.pollPeriod", "", Duration.ONE_SECOND);
+
+    @SetFromFlag("failedStabilizationDelay")
+    public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceFailedStabilizationDelay",
+            "Time period for which the health check consistently fails "
+                    + "(e.g. doesn't report failed-ok-faled) before concluding failure.",
+            Duration.ZERO);
+
+    @SetFromFlag("recoveredStabilizationDelay")
+    public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceRecoveredStabilizationDelay",
+            "Time period for which the health check succeeds continiually " +
+                    "(e.g. doesn't report ok-failed-ok) before concluding recovered",
+            Duration.ZERO);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED);
+
+    public interface CalculatedStatus {
+        boolean isHealthy();
+        String getDescription();
+    }
+
+    private final class PublishJob implements Runnable {
+        @Override public void run() {
+            try {
+                executorTime = System.currentTimeMillis();
+                executorQueued.set(false);
+
+                publishNow();
+
+            } catch (Exception e) {
+                if (isRunning()) {
+                    LOG.error("Problem resizing: "+e, e);
+                } else {
+                    if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e);
+                }
+            } catch (Throwable t) {
+                LOG.error("Problem in service-failure-detector: "+t, t);
+                throw Exceptions.propagate(t);
+            }
+        }
+    }
+
+    private final class HealthPoller implements Runnable {
+        @Override
+        public void run() {
+            checkHealth();
+        }
+    }
+
+    private final class HealthPollingTaskFactory implements Callable<Task<?>> {
+        @Override
+        public Task<?> call() {
+            BasicTask<Void> task = new BasicTask<Void>(new HealthPoller());
+            BrooklynTaskTags.setTransient(task);
+            return task;
+        }
+    }
+
+    protected static class BasicCalculatedStatus implements CalculatedStatus {
+        private boolean healthy;
+        private String description;
+
+        public BasicCalculatedStatus(boolean healthy, String description) {
+            this.healthy = healthy;
+            this.description = description;
+        }
+
+        @Override
+        public boolean isHealthy() {
+            return healthy;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
+    public enum LastPublished {
+        NONE,
+        FAILED,
+        RECOVERED;
+    }
+
+    protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>();
+    protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>();
+
+    protected Long currentFailureStartTime = null;
+    protected Long currentRecoveryStartTime = null;
+
+    protected LastPublished lastPublished = LastPublished.NONE;
+
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+
+    private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory();
+
+    private Task<?> scheduledTask;
+
+    protected abstract CalculatedStatus calculateStatus();
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+
+        if (isRunning()) {
+            doStartPolling();
+        }
+    }
+
+    @Override
+    public void suspend() {
+        scheduledTask.cancel(true);
+        super.suspend();
+    }
+
+    @Override
+    public void resume() {
+        currentFailureStartTime = null;
+        currentRecoveryStartTime = null;
+        lastPublished = LastPublished.NONE;
+        executorQueued.set(false);
+        executorTime = 0;
+
+        super.resume();
+        doStartPolling();
+    }
+
+    protected void doStartPolling() {
+        if (scheduledTask == null || scheduledTask.isDone()) {
+            ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory);
+            scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);;
+        }
+    }
+
+    private String getTaskName() {
+        return getDisplayName();
+    }
+
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
+    }
+
+    protected Duration getFailedStabilizationDelay() {
+        return getConfig(FAILED_STABILIZATION_DELAY);
+    }
+
+    protected Duration getRecoveredStabilizationDelay() {
+        return getConfig(RECOVERED_STABILIZATION_DELAY);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        return getConfig(SENSOR_FAILED);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        return getConfig(SENSOR_RECOVERED);
+    }
+
+    private synchronized void checkHealth() {
+        CalculatedStatus status = calculateStatus();
+        boolean healthy = status.isHealthy();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            stateLastGood.set(now);
+            if (lastPublished == LastPublished.FAILED) {
+                if (currentRecoveryStartTime == null) {
+                    LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)});
+                    currentRecoveryStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            } else {
+                if (currentFailureStartTime != null) {
+                    LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)});
+                    currentFailureStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        } else {
+            stateLastFail.set(now);
+            if (lastPublished != LastPublished.FAILED) {
+                if (currentFailureStartTime == null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
+                    currentFailureStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            } else {
+                if (currentRecoveryStartTime != null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
+                    currentRecoveryStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        }
+    }
+
+    protected void schedulePublish() {
+        schedulePublish(0);
+    }
+
+    protected void schedulePublish(long delay) {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
+
+            Runnable job = new PublishJob();
+
+            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
+            ((EntityInternal)entity).getExecutionContext().submit(task);
+        }
+    }
+
+    private synchronized void publishNow() {
+        if (!isRunning()) return;
+
+        CalculatedStatus calculatedStatus = calculateStatus();
+        boolean healthy = calculatedStatus.isHealthy();
+
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastFail.get();
+        long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds();
+        long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            if (lastPublished == LastPublished.FAILED) {
+                // only publish if consistently up for serviceRecoveredStabilizationDelay
+                long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime);
+                long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
+                if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description});
+                    entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.RECOVERED;
+                    currentFailureStartTime = null;
+                } else {
+                    long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        } else {
+            if (lastPublished != LastPublished.FAILED) {
+                // only publish if consistently down for serviceFailedStabilizationDelay
+                long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime);
+                long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
+                if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description});
+                    entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.FAILED;
+                    currentRecoveryStartTime = null;
+                } else {
+                    long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        }
+    }
+
+    protected String getDescription(CalculatedStatus status) {
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastGood.get();
+        Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay();
+        Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay();
+
+        return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+
+                    "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
+                status.getDescription(),
+                status.isHealthy(),
+                Time.makeDateString(System.currentTimeMillis()),
+                (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"),
+                (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"),
+                lastPublished,
+                (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
+                (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
+    }
+
+    private long getTimeDiff(Long recent, Long previous) {
+        return (previous == null) ? recent : (recent - previous);
+    }
+
+    private String getTimeStringSince(Long time) {
+        return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
new file mode 100644
index 0000000..6e37ef9
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.policy.Policy;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.base.Preconditions;
+
+public class ConditionalSuspendPolicy extends AbstractPolicy {
+    private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class);
+
+    @SetFromFlag("suppressSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
+            "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED); 
+
+    @SetFromFlag("resetSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
+            "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED);
+
+    @SetFromFlag("target")
+    public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class,
+            "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity.");
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        Object target = config().get(SUSPEND_TARGET);
+        Preconditions.checkNotNull(target, "Suspend target required");
+        Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target);
+        subscribe();
+    }
+
+    private void subscribe() {
+        subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.suspend();
+                    LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
+                }
+            }
+
+        });
+        subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.resume();
+                    LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
+                }
+            }
+        });
+    }
+
+    private Policy getTargetPolicy() {
+        Object target = config().get(SUSPEND_TARGET);
+        if (target instanceof Policy) {
+            return (Policy)target;
+        } else if (target instanceof String) {
+            for (Policy policy : entity.getPolicies()) {
+                // No way to set config values for keys NOT declared in the policy,
+                // so must use displayName as a generally available config value.
+                if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) {
+                    return policy;
+                }
+            }
+        } else {
+            throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
index acd371d..a92b8a8 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
@@ -18,35 +18,17 @@
  */
 package brooklyn.policy.ha;
 
-import static brooklyn.util.time.Time.makeTimeStringRounded;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import brooklyn.catalog.Catalog;
 import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.BrooklynTaskTags;
 import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
 import brooklyn.event.basic.BasicConfigKey;
 import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.management.Task;
-import brooklyn.policy.basic.AbstractPolicy;
 import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.guava.Maybe;
 import brooklyn.util.net.Networking;
-import brooklyn.util.task.BasicTask;
-import brooklyn.util.task.ScheduledTask;
 import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
 
 import com.google.common.net.HostAndPort;
 
@@ -56,24 +38,12 @@ import com.google.common.net.HostAndPort;
  */
 @Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, "
         + "emitting an event if the connection is lost/restored")
-public class ConnectionFailureDetector extends AbstractPolicy {
-
-    // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays.
-
-    public enum LastPublished {
-        NONE,
-        FAILED,
-        RECOVERED;
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(ConnectionFailureDetector.class);
-
-    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+public class ConnectionFailureDetector extends AbstractFailureDetector {
 
     public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint");
-    
+
     public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND);
-    
+
     public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
 
     public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
@@ -95,245 +65,61 @@ public class ConnectionFailureDetector extends AbstractPolicy {
             .defaultValue(Duration.ZERO)
             .build();
 
-    protected final AtomicReference<Long> connectionLastUp = new AtomicReference<Long>();
-    protected final AtomicReference<Long> connectionLastDown = new AtomicReference<Long>();
-    
-    protected Long currentFailureStartTime = null;
-    protected Long currentRecoveryStartTime = null;
-
-    protected LastPublished lastPublished = LastPublished.NONE;
-
-    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
-    private volatile long executorTime = 0;
-
-    private Callable<Task<?>> pollingTaskFactory;
-
-    private Task<?> scheduledTask;
-    
-    public ConnectionFailureDetector() {
-    }
-    
     @Override
     public void init() {
+        super.init();
         getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
-
-        pollingTaskFactory = new Callable<Task<?>>() {
-            @Override public Task<?> call() {
-                BasicTask<Void> task = new BasicTask<Void>(new Runnable() {
-                    @Override public void run() {
-                        checkHealth();
-                    }});
-                BrooklynTaskTags.setTransient(task);
-                return task;
-            }
-        };
-    }
-    
-    @Override
-    public void setEntity(EntityLocal entity) {
-        super.setEntity(entity);
-
-        if (isRunning()) {
-            doStartPolling();
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
         }
     }
 
     @Override
-    public void suspend() {
-        scheduledTask.cancel(true);
-        super.suspend();
+    protected CalculatedStatus calculateStatus() {
+        HostAndPort endpoint = getConfig(ENDPOINT);
+        boolean isHealthy = Networking.isReachable(endpoint);
+        return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
     }
-    
+
+    //Persistence compatibility overrides
     @Override
-    public void resume() {
-        currentFailureStartTime = null;
-        currentRecoveryStartTime = null;
-        lastPublished = LastPublished.NONE;
-        executorQueued.set(false);
-        executorTime = 0;
-        
-        super.resume();
-        doStartPolling();
-    }
-    
-    protected void doStartPolling() {
-        if (scheduledTask == null || scheduledTask.isDone()) {
-            ScheduledTask task = new ScheduledTask(MutableMap.of("period", getConfig(POLL_PERIOD)), pollingTaskFactory);
-            scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
-        }
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
     }
-    
-    private Duration getConnectionFailedStabilizationDelay() {
+
+    @Override
+    protected Duration getFailedStabilizationDelay() {
         return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
     }
 
-    private Duration getConnectionRecoveredStabilizationDelay() {
+    @Override
+    protected Duration getRecoveredStabilizationDelay() {
         return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
     }
 
-    private synchronized void checkHealth() {
-        CalculatedStatus status = calculateStatus();
-        boolean connected = status.connected;
-        long now = System.currentTimeMillis();
-        
-        if (connected) {
-            connectionLastUp.set(now);
-            if (lastPublished == LastPublished.FAILED) {
-                if (currentRecoveryStartTime == null) {
-                    LOG.info("{} connectivity-check for {}, now recovering: {}", new Object[] {this, entity, status.getDescription()});
-                    currentRecoveryStartTime = now;
-                    schedulePublish();
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing recovering: {}", new Object[] {this, entity, status.getDescription()});
-                }
-            } else {
-                if (currentFailureStartTime != null) {
-                    LOG.info("{} connectivity-check for {}, now healthy: {}", new Object[] {this, entity, status.getDescription()});
-                    currentFailureStartTime = null;
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still healthy: {}", new Object[] {this, entity, status.getDescription()});
-                }
-            }
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
+        if (sensorFailed.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorFailed.get();
         } else {
-            connectionLastDown.set(now);
-            if (lastPublished != LastPublished.FAILED) {
-                if (currentFailureStartTime == null) {
-                    LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()});
-                    currentFailureStartTime = now;
-                    schedulePublish();
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing failing: {}", new Object[] {this, entity, status.getDescription()});
-                }
-            } else {
-                if (currentRecoveryStartTime != null) {
-                    LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()});
-                    currentRecoveryStartTime = null;
-                } else {
-                    if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still failed: {}", new Object[] {this, entity, status.getDescription()});
-                }
-            }
+            return CONNECTION_FAILED;
         }
     }
-    
-    protected CalculatedStatus calculateStatus() {
-        return new CalculatedStatus();
-    }
-
-    protected void schedulePublish() {
-        schedulePublish(0);
-    }
-    
-    protected void schedulePublish(long delay) {
-        if (isRunning() && executorQueued.compareAndSet(false, true)) {
-            long now = System.currentTimeMillis();
-            delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
-            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
-            
-            Runnable job = new Runnable() {
-                @Override public void run() {
-                    try {
-                        executorTime = System.currentTimeMillis();
-                        executorQueued.set(false);
 
-                        publishNow();
-                        
-                    } catch (Exception e) {
-                        if (isRunning()) {
-                            LOG.error("Problem resizing: "+e, e);
-                        } else {
-                            if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e);
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Problem in service-failure-detector: "+t, t);
-                        throw Exceptions.propagate(t);
-                    }
-                }
-            };
-            
-            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
-            ((EntityInternal)entity).getExecutionContext().submit(task);
-        }
-    }
-    
-    private synchronized void publishNow() {
-        if (!isRunning()) return;
-        
-        CalculatedStatus calculatedStatus = calculateStatus();
-        boolean connected = calculatedStatus.connected;
-        
-        Long lastUpTime = connectionLastUp.get();
-        Long lastDownTime = connectionLastDown.get();
-        long serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay().toMilliseconds();
-        long serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay().toMilliseconds();
-        long now = System.currentTimeMillis();
-        
-        if (connected) {
-            if (lastPublished == LastPublished.FAILED) {
-                // only publish if consistently up for serviceRecoveredStabilizationDelay
-                long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime);
-                long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
-                if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
-                    String description = calculatedStatus.getDescription();
-                    LOG.warn("{} connectivity-check for {}, publishing recovered: {}", new Object[] {this, entity, description});
-                    entity.emit(CONNECTION_RECOVERED, new HASensors.FailureDescriptor(entity, description));
-                    lastPublished = LastPublished.RECOVERED;
-                    currentFailureStartTime = null;
-                } else {
-                    long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
-                    schedulePublish(nextAttemptTime);
-                }
-            }
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
+        if (sensorRecovered.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorRecovered.get();
         } else {
-            if (lastPublished != LastPublished.FAILED) {
-                // only publish if consistently down for serviceFailedStabilizationDelay
-                long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime);
-                long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
-                if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) {
-                    String description = calculatedStatus.getDescription();
-                    LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description});
-                    entity.emit(CONNECTION_FAILED, new HASensors.FailureDescriptor(entity, description));
-                    lastPublished = LastPublished.FAILED;
-                    currentRecoveryStartTime = null;
-                } else {
-                    long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod);
-                    schedulePublish(nextAttemptTime);
-                }
-            }
+            return CONNECTION_RECOVERED;
         }
     }
 
-    public class CalculatedStatus {
-        public final boolean connected;
-        
-        public CalculatedStatus() {
-            HostAndPort endpoint = getConfig(ENDPOINT);
-            connected = Networking.isReachable(endpoint);
-        }
-        
-        public String getDescription() {
-            Long lastUpTime = connectionLastUp.get();
-            Long lastDownTime = connectionLastDown.get();
-            Duration serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay();
-            Duration serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay();
-
-            return String.format("endpoint=%s; connected=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+
-                        "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
-                    getConfig(ENDPOINT), 
-                    connected,
-                    Time.makeDateString(System.currentTimeMillis()),
-                    (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"),
-                    (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"),
-                    lastPublished,
-                    (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
-                    (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
-        }
-    }
-    
-    private long getTimeDiff(Long recent, Long previous) {
-        return (previous == null) ? recent : (recent - previous);
-    }
-    
-    private String getTimeStringSince(Long time) {
-        return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
new file mode 100644
index 0000000..435592e
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicNotificationSensor;
+import brooklyn.location.basic.Machines;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.policy.ha.AbstractFailureDetector.LastPublished;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.internal.ssh.SshTool;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, "
+        + "emitting an event if the connection is lost/restored")
+public class SshMachineFailureDetector extends AbstractFailureDetector {
+    private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
+
+    public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
+
+    public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
+
+    public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey(
+            "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS);
+
+    @Override
+    public void init() {
+        super.init();
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
+        }
+        if (config().getRaw(POLL_PERIOD).isAbsent()) {
+            config().set(POLL_PERIOD, Duration.ONE_MINUTE);
+        }
+    }
+
+    @Override
+    protected CalculatedStatus calculateStatus() {
+        Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations());
+        if (sshMachineOption.isPresent()) {
+            SshMachineLocation sshMachine = sshMachineOption.get();
+            try {
+                Duration timeout = config().get(CONNECT_TIMEOUT);
+                Map<String, ?> flags = ImmutableMap.of(
+                        SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(),
+                        SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(),
+                        SshTool.PROP_SSH_TRIES.getName(), 1);
+                int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
+                return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString());
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null;
+                if (isFirstFailure) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed connecting to machine " + sshMachine, e);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Failed connecting to machine " + sshMachine, e);
+                    }
+                }
+                return new BasicCalculatedStatus(false, e.getMessage());
+            }
+        } else {
+            return new BasicCalculatedStatus(true, "no machine started, not complaining");
+        }
+    }
+}


[2/4] incubator-brooklyn git commit: Add option to ServiceFailureDetector to republish failed events

Posted by al...@apache.org.
Add option to ServiceFailureDetector to republish failed events


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/75f00025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/75f00025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/75f00025

Branch: refs/heads/master
Commit: 75f00025980c240c8bd37c5ed1b74198bda76d99
Parents: dd0c234
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Fri Mar 27 19:10:29 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:11 2015 +0300

----------------------------------------------------------------------
 .../policy/ha/ServiceFailureDetector.java       | 14 +++-
 .../policy/ha/ServiceFailureDetectorTest.java   | 71 +++++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
index f3264e3..7f5c142 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -102,6 +102,12 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
             .defaultValue(Duration.ZERO)
             .build();
 
+    @SetFromFlag("entityFailedRepublishTime")
+    public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
+            .name("entityFailed.republishTime")
+            .description("Publish failed state periodically at the specified intervals, null to disable.")
+            .build();
+
     protected Long firstUpTime;
     
     protected Long currentFailureStartTime = null;
@@ -215,7 +221,13 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
                 if (delayBeforeCheck<=0) {
                     if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}", 
                             new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
-                    publishEntityFailedTime = null;
+                    Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
+                    if (republishDelay == null) {
+                        publishEntityFailedTime = null;
+                    } else {
+                        publishEntityFailedTime = now + republishDelay.toMilliseconds();
+                        recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
+                    }
                     lastPublished = LastPublished.FAILED;
                     entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
index c31bbda..7250e8e 100644
--- a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
+++ b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -24,7 +24,10 @@ import static org.testng.Assert.fail;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -56,6 +59,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 
 public class ServiceFailureDetectorTest {
+    private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
 
     private static final int TIMEOUT_MS = 10*1000;
 
@@ -245,7 +249,7 @@ public class ServiceFailureDetectorTest {
             .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
             .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
         
-        // Set the entit to healthy
+        // Set the entity to healthy
         e1.setAttribute(TestEntity.SERVICE_UP, true);
         ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
         EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
@@ -312,6 +316,71 @@ public class ServiceFailureDetectorTest {
         assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
     }
     
+    @Test(groups="Integration") // Has a 1.5 second wait
+    public void testRepublishedFailure() throws Exception {
+        Duration republishPeriod = Duration.millis(100);
+
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+            
+        // Set the entity to healthy
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        // Make the entity fail;
+        ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+        //wait for at least 10 republish events (~1 sec)
+        assertEventsSizeEventually(10);
+
+        // Now recover
+        ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+        //once recovered check no more failed events emitted periodically
+        assertEventsSizeContiniually(events.size());
+
+        SensorEvent<FailureDescriptor> prevEvent = null;
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (prevEvent != null) {
+                long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+                long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+                if (deviation > republishPeriod.toMilliseconds()/10 &&
+                        //warn only if recovered is too far away from the last failure
+                        (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+                        repeatOffset > republishPeriod.toMilliseconds())) {
+                    log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+                }
+            }
+            prevEvent = event;
+        }
+        
+        //make sure no republish takes place after recovered
+        assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+    }
+    
+    private void assertEventsSizeContiniually(final int size) {
+        Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
+    private void assertEventsSizeEventually(final int size) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
     private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
         for (SensorEvent<FailureDescriptor> event : events) {
             if (event.getSensor().equals(sensor) && 


[3/4] incubator-brooklyn git commit: Add uniqueTag for policies

Posted by al...@apache.org.
Add uniqueTag for policies


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/68464399
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/68464399
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/68464399

Branch: refs/heads/master
Commit: 68464399e2ee455e2d8beff37330f1e6cedd575e
Parents: 75f0002
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Tue Mar 31 14:20:18 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:18 2015 +0300

----------------------------------------------------------------------
 .../src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java | 2 ++
 policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java      | 2 ++
 .../main/java/brooklyn/policy/ha/SshMachineFailureDetector.java    | 2 ++
 3 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
index 6e37ef9..cc2ab3a 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -30,6 +30,7 @@ import brooklyn.event.SensorEventListener;
 import brooklyn.policy.Policy;
 import brooklyn.policy.basic.AbstractPolicy;
 import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.javalang.JavaClassNames;
 
 import com.google.common.base.Preconditions;
 
@@ -57,6 +58,7 @@ public class ConditionalSuspendPolicy extends AbstractPolicy {
         Preconditions.checkNotNull(target, "Suspend target required");
         Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target);
         subscribe();
+        uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName();
     }
 
     private void subscribe() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
index 3a0b6d4..b71d6d6 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
@@ -43,6 +43,7 @@ import brooklyn.policy.ha.HASensors.FailureDescriptor;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
 import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.javalang.JavaClassNames;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 
@@ -91,6 +92,7 @@ public class ServiceRestarter extends AbstractPolicy {
     public ServiceRestarter(ConfigBag configBag) {
         // TODO hierarchy should use ConfigBag, and not change flags
         super(configBag.getAllConfigMutable());
+        uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName();
     }
     
     public ServiceRestarter(Sensor<?> failureSensorToMonitor) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
index 435592e..ee1c6ee 100644
--- a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -43,6 +43,7 @@ import com.google.common.collect.ImmutableMap;
         + "emitting an event if the connection is lost/restored")
 public class SshMachineFailureDetector extends AbstractFailureDetector {
     private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
+    public static final String DEFAULT_UNIQUE_TAG = "failureDetector.sshMachine.tag";
 
     public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
 
@@ -63,6 +64,7 @@ public class SshMachineFailureDetector extends AbstractFailureDetector {
         if (config().getRaw(POLL_PERIOD).isAbsent()) {
             config().set(POLL_PERIOD, Duration.ONE_MINUTE);
         }
+        uniqueTag = DEFAULT_UNIQUE_TAG;
     }
 
     @Override


[4/4] incubator-brooklyn git commit: This closes #571

Posted by al...@apache.org.
This closes #571


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/04fc801d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/04fc801d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/04fc801d

Branch: refs/heads/master
Commit: 04fc801d005ac6bf8e852dfa74b571b5c34cd693
Parents: 6153b0a 6846439
Author: Aled Sage <al...@gmail.com>
Authored: Sun Apr 12 19:50:36 2015 -0500
Committer: Aled Sage <al...@gmail.com>
Committed: Sun Apr 12 19:50:36 2015 -0500

----------------------------------------------------------------------
 .../policy/ha/AbstractFailureDetector.java      | 359 +++++++++++++++++++
 .../policy/ha/ConditionalSuspendPolicy.java     | 103 ++++++
 .../policy/ha/ConnectionFailureDetector.java    | 290 ++-------------
 .../policy/ha/ServiceFailureDetector.java       |  14 +-
 .../brooklyn/policy/ha/ServiceRestarter.java    |   2 +
 .../policy/ha/SshMachineFailureDetector.java    | 101 ++++++
 .../policy/ha/ServiceFailureDetectorTest.java   |  71 +++-
 7 files changed, 686 insertions(+), 254 deletions(-)
----------------------------------------------------------------------