You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/04/13 02:51:21 UTC
[1/4] incubator-brooklyn git commit: new policies:
SshConnectionFailure, ConditionalSuspendPolicy
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 6153b0a8f -> 04fc801d0
new policies: SshConnectionFailure, ConditionalSuspendPolicy
* SshConnectionFailure emits CONNECTION_FAILURE if it can't make ssh connection to the machine of the entity
* ConditionalSuspendPolicy suspends a target policy if it receives a sensor event (CONNECTION_FAILURE by default)
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/dd0c2348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/dd0c2348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/dd0c2348
Branch: refs/heads/master
Commit: dd0c2348a2677526f29866b72aaa8cbe351c503a
Parents: 0ff4216
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Tue Mar 24 15:17:10 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 12:13:40 2015 +0300
----------------------------------------------------------------------
.../policy/ha/AbstractFailureDetector.java | 359 +++++++++++++++++++
.../policy/ha/ConditionalSuspendPolicy.java | 101 ++++++
.../policy/ha/ConnectionFailureDetector.java | 290 ++-------------
.../policy/ha/SshMachineFailureDetector.java | 99 +++++
4 files changed, 597 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
new file mode 100644
index 0000000..6a1324c
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import static brooklyn.util.time.Time.makeTimeStringRounded;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.management.Task;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.task.BasicTask;
+import brooklyn.util.task.ScheduledTask;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.reflect.TypeToken;
+
+public abstract class AbstractFailureDetector extends AbstractPolicy {
+
+ // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays.
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class);
+
+ private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+
+ public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey(
+ "failureDetector.pollPeriod", "", Duration.ONE_SECOND);
+
+ @SetFromFlag("failedStabilizationDelay")
+ public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
+ "failureDetector.serviceFailedStabilizationDelay",
+ "Time period for which the health check consistently fails "
+ + "(e.g. doesn't report failed-ok-faled) before concluding failure.",
+ Duration.ZERO);
+
+ @SetFromFlag("recoveredStabilizationDelay")
+ public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
+ "failureDetector.serviceRecoveredStabilizationDelay",
+ "Time period for which the health check succeeds continiually " +
+ "(e.g. doesn't report ok-failed-ok) before concluding recovered",
+ Duration.ZERO);
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+ "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED);
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+ "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED);
+
+ public interface CalculatedStatus {
+ boolean isHealthy();
+ String getDescription();
+ }
+
+ private final class PublishJob implements Runnable {
+ @Override public void run() {
+ try {
+ executorTime = System.currentTimeMillis();
+ executorQueued.set(false);
+
+ publishNow();
+
+ } catch (Exception e) {
+ if (isRunning()) {
+ LOG.error("Problem resizing: "+e, e);
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e);
+ }
+ } catch (Throwable t) {
+ LOG.error("Problem in service-failure-detector: "+t, t);
+ throw Exceptions.propagate(t);
+ }
+ }
+ }
+
+ private final class HealthPoller implements Runnable {
+ @Override
+ public void run() {
+ checkHealth();
+ }
+ }
+
+ private final class HealthPollingTaskFactory implements Callable<Task<?>> {
+ @Override
+ public Task<?> call() {
+ BasicTask<Void> task = new BasicTask<Void>(new HealthPoller());
+ BrooklynTaskTags.setTransient(task);
+ return task;
+ }
+ }
+
+ protected static class BasicCalculatedStatus implements CalculatedStatus {
+ private boolean healthy;
+ private String description;
+
+ public BasicCalculatedStatus(boolean healthy, String description) {
+ this.healthy = healthy;
+ this.description = description;
+ }
+
+ @Override
+ public boolean isHealthy() {
+ return healthy;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ public enum LastPublished {
+ NONE,
+ FAILED,
+ RECOVERED;
+ }
+
+ protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>();
+ protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>();
+
+ protected Long currentFailureStartTime = null;
+ protected Long currentRecoveryStartTime = null;
+
+ protected LastPublished lastPublished = LastPublished.NONE;
+
+ private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+ private volatile long executorTime = 0;
+
+ private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory();
+
+ private Task<?> scheduledTask;
+
+ protected abstract CalculatedStatus calculateStatus();
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+
+ if (isRunning()) {
+ doStartPolling();
+ }
+ }
+
+ @Override
+ public void suspend() {
+ scheduledTask.cancel(true);
+ super.suspend();
+ }
+
+ @Override
+ public void resume() {
+ currentFailureStartTime = null;
+ currentRecoveryStartTime = null;
+ lastPublished = LastPublished.NONE;
+ executorQueued.set(false);
+ executorTime = 0;
+
+ super.resume();
+ doStartPolling();
+ }
+
+ protected void doStartPolling() {
+ if (scheduledTask == null || scheduledTask.isDone()) {
+ ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory);
+ scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);;
+ }
+ }
+
+ private String getTaskName() {
+ return getDisplayName();
+ }
+
+ protected Duration getPollPeriod() {
+ return getConfig(POLL_PERIOD);
+ }
+
+ protected Duration getFailedStabilizationDelay() {
+ return getConfig(FAILED_STABILIZATION_DELAY);
+ }
+
+ protected Duration getRecoveredStabilizationDelay() {
+ return getConfig(RECOVERED_STABILIZATION_DELAY);
+ }
+
+ protected Sensor<FailureDescriptor> getSensorFailed() {
+ return getConfig(SENSOR_FAILED);
+ }
+
+ protected Sensor<FailureDescriptor> getSensorRecovered() {
+ return getConfig(SENSOR_RECOVERED);
+ }
+
+ private synchronized void checkHealth() {
+ CalculatedStatus status = calculateStatus();
+ boolean healthy = status.isHealthy();
+ long now = System.currentTimeMillis();
+
+ if (healthy) {
+ stateLastGood.set(now);
+ if (lastPublished == LastPublished.FAILED) {
+ if (currentRecoveryStartTime == null) {
+ LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)});
+ currentRecoveryStartTime = now;
+ schedulePublish();
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)});
+ }
+ } else {
+ if (currentFailureStartTime != null) {
+ LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)});
+ currentFailureStartTime = null;
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)});
+ }
+ }
+ } else {
+ stateLastFail.set(now);
+ if (lastPublished != LastPublished.FAILED) {
+ if (currentFailureStartTime == null) {
+ LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
+ currentFailureStartTime = now;
+ schedulePublish();
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)});
+ }
+ } else {
+ if (currentRecoveryStartTime != null) {
+ LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
+ currentRecoveryStartTime = null;
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)});
+ }
+ }
+ }
+ }
+
+ protected void schedulePublish() {
+ schedulePublish(0);
+ }
+
+ protected void schedulePublish(long delay) {
+ if (isRunning() && executorQueued.compareAndSet(false, true)) {
+ long now = System.currentTimeMillis();
+ delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+ if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
+
+ Runnable job = new PublishJob();
+
+ ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
+ ((EntityInternal)entity).getExecutionContext().submit(task);
+ }
+ }
+
+ private synchronized void publishNow() {
+ if (!isRunning()) return;
+
+ CalculatedStatus calculatedStatus = calculateStatus();
+ boolean healthy = calculatedStatus.isHealthy();
+
+ Long lastUpTime = stateLastGood.get();
+ Long lastDownTime = stateLastFail.get();
+ long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds();
+ long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds();
+ long now = System.currentTimeMillis();
+
+ if (healthy) {
+ if (lastPublished == LastPublished.FAILED) {
+ // only publish if consistently up for serviceRecoveredStabilizationDelay
+ long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime);
+ long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
+ if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
+ String description = getDescription(calculatedStatus);
+ LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description});
+ entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description));
+ lastPublished = LastPublished.RECOVERED;
+ currentFailureStartTime = null;
+ } else {
+ long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
+ schedulePublish(nextAttemptTime);
+ }
+ }
+ } else {
+ if (lastPublished != LastPublished.FAILED) {
+ // only publish if consistently down for serviceFailedStabilizationDelay
+ long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime);
+ long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
+ if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) {
+ String description = getDescription(calculatedStatus);
+ LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description});
+ entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description));
+ lastPublished = LastPublished.FAILED;
+ currentRecoveryStartTime = null;
+ } else {
+ long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod);
+ schedulePublish(nextAttemptTime);
+ }
+ }
+ }
+ }
+
+ protected String getDescription(CalculatedStatus status) {
+ Long lastUpTime = stateLastGood.get();
+ Long lastDownTime = stateLastGood.get();
+ Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay();
+ Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay();
+
+ return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+
+ "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
+ status.getDescription(),
+ status.isHealthy(),
+ Time.makeDateString(System.currentTimeMillis()),
+ (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"),
+ (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"),
+ lastPublished,
+ (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
+ (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
+ }
+
+ private long getTimeDiff(Long recent, Long previous) {
+ return (previous == null) ? recent : (recent - previous);
+ }
+
+ private String getTimeStringSince(Long time) {
+ return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
new file mode 100644
index 0000000..6e37ef9
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.policy.Policy;
+import brooklyn.policy.basic.AbstractPolicy;
+import brooklyn.util.flags.SetFromFlag;
+
+import com.google.common.base.Preconditions;
+
+public class ConditionalSuspendPolicy extends AbstractPolicy {
+ private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class);
+
+ @SetFromFlag("suppressSensor")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
+ "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED);
+
+ @SetFromFlag("resetSensor")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
+ "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED);
+
+ @SetFromFlag("target")
+ public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class,
+ "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity.");
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ Object target = config().get(SUSPEND_TARGET);
+ Preconditions.checkNotNull(target, "Suspend target required");
+ Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target);
+ subscribe();
+ }
+
+ private void subscribe() {
+ subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() {
+ @Override public void onEvent(final SensorEvent<Object> event) {
+ if (isRunning()) {
+ Policy target = getTargetPolicy();
+ target.suspend();
+ LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
+ }
+ }
+
+ });
+ subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() {
+ @Override public void onEvent(final SensorEvent<Object> event) {
+ if (isRunning()) {
+ Policy target = getTargetPolicy();
+ target.resume();
+ LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
+ }
+ }
+ });
+ }
+
+ private Policy getTargetPolicy() {
+ Object target = config().get(SUSPEND_TARGET);
+ if (target instanceof Policy) {
+ return (Policy)target;
+ } else if (target instanceof String) {
+ for (Policy policy : entity.getPolicies()) {
+ // No way to set config values for keys NOT declared in the policy,
+ // so must use displayName as a generally available config value.
+ if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) {
+ return policy;
+ }
+ }
+ } else {
+ throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
index acd371d..a92b8a8 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
@@ -18,35 +18,17 @@
*/
package brooklyn.policy.ha;
-import static brooklyn.util.time.Time.makeTimeStringRounded;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import brooklyn.catalog.Catalog;
import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.BrooklynTaskTags;
import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityLocal;
+import brooklyn.event.Sensor;
import brooklyn.event.basic.BasicConfigKey;
import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.management.Task;
-import brooklyn.policy.basic.AbstractPolicy;
import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.guava.Maybe;
import brooklyn.util.net.Networking;
-import brooklyn.util.task.BasicTask;
-import brooklyn.util.task.ScheduledTask;
import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
import com.google.common.net.HostAndPort;
@@ -56,24 +38,12 @@ import com.google.common.net.HostAndPort;
*/
@Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, "
+ "emitting an event if the connection is lost/restored")
-public class ConnectionFailureDetector extends AbstractPolicy {
-
- // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays.
-
- public enum LastPublished {
- NONE,
- FAILED,
- RECOVERED;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionFailureDetector.class);
-
- private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+public class ConnectionFailureDetector extends AbstractFailureDetector {
public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint");
-
+
public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND);
-
+
public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
@@ -95,245 +65,61 @@ public class ConnectionFailureDetector extends AbstractPolicy {
.defaultValue(Duration.ZERO)
.build();
- protected final AtomicReference<Long> connectionLastUp = new AtomicReference<Long>();
- protected final AtomicReference<Long> connectionLastDown = new AtomicReference<Long>();
-
- protected Long currentFailureStartTime = null;
- protected Long currentRecoveryStartTime = null;
-
- protected LastPublished lastPublished = LastPublished.NONE;
-
- private final AtomicBoolean executorQueued = new AtomicBoolean(false);
- private volatile long executorTime = 0;
-
- private Callable<Task<?>> pollingTaskFactory;
-
- private Task<?> scheduledTask;
-
- public ConnectionFailureDetector() {
- }
-
@Override
public void init() {
+ super.init();
getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
-
- pollingTaskFactory = new Callable<Task<?>>() {
- @Override public Task<?> call() {
- BasicTask<Void> task = new BasicTask<Void>(new Runnable() {
- @Override public void run() {
- checkHealth();
- }});
- BrooklynTaskTags.setTransient(task);
- return task;
- }
- };
- }
-
- @Override
- public void setEntity(EntityLocal entity) {
- super.setEntity(entity);
-
- if (isRunning()) {
- doStartPolling();
+ if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+ config().set(SENSOR_FAILED, CONNECTION_FAILED);
+ }
+ if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+ config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
}
}
@Override
- public void suspend() {
- scheduledTask.cancel(true);
- super.suspend();
+ protected CalculatedStatus calculateStatus() {
+ HostAndPort endpoint = getConfig(ENDPOINT);
+ boolean isHealthy = Networking.isReachable(endpoint);
+ return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
}
-
+
+ //Persistence compatibility overrides
@Override
- public void resume() {
- currentFailureStartTime = null;
- currentRecoveryStartTime = null;
- lastPublished = LastPublished.NONE;
- executorQueued.set(false);
- executorTime = 0;
-
- super.resume();
- doStartPolling();
- }
-
- protected void doStartPolling() {
- if (scheduledTask == null || scheduledTask.isDone()) {
- ScheduledTask task = new ScheduledTask(MutableMap.of("period", getConfig(POLL_PERIOD)), pollingTaskFactory);
- scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
- }
+ protected Duration getPollPeriod() {
+ return getConfig(POLL_PERIOD);
}
-
- private Duration getConnectionFailedStabilizationDelay() {
+
+ @Override
+ protected Duration getFailedStabilizationDelay() {
return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
}
- private Duration getConnectionRecoveredStabilizationDelay() {
+ @Override
+ protected Duration getRecoveredStabilizationDelay() {
return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
}
- private synchronized void checkHealth() {
- CalculatedStatus status = calculateStatus();
- boolean connected = status.connected;
- long now = System.currentTimeMillis();
-
- if (connected) {
- connectionLastUp.set(now);
- if (lastPublished == LastPublished.FAILED) {
- if (currentRecoveryStartTime == null) {
- LOG.info("{} connectivity-check for {}, now recovering: {}", new Object[] {this, entity, status.getDescription()});
- currentRecoveryStartTime = now;
- schedulePublish();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing recovering: {}", new Object[] {this, entity, status.getDescription()});
- }
- } else {
- if (currentFailureStartTime != null) {
- LOG.info("{} connectivity-check for {}, now healthy: {}", new Object[] {this, entity, status.getDescription()});
- currentFailureStartTime = null;
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still healthy: {}", new Object[] {this, entity, status.getDescription()});
- }
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Sensor<FailureDescriptor> getSensorFailed() {
+ Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
+ if (sensorFailed.isPresent()) {
+ return (Sensor<FailureDescriptor>)sensorFailed.get();
} else {
- connectionLastDown.set(now);
- if (lastPublished != LastPublished.FAILED) {
- if (currentFailureStartTime == null) {
- LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()});
- currentFailureStartTime = now;
- schedulePublish();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, continuing failing: {}", new Object[] {this, entity, status.getDescription()});
- }
- } else {
- if (currentRecoveryStartTime != null) {
- LOG.info("{} connectivity-check for {}, now failing: {}", new Object[] {this, entity, status.getDescription()});
- currentRecoveryStartTime = null;
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} connectivity-check for {}, still failed: {}", new Object[] {this, entity, status.getDescription()});
- }
- }
+ return CONNECTION_FAILED;
}
}
-
- protected CalculatedStatus calculateStatus() {
- return new CalculatedStatus();
- }
-
- protected void schedulePublish() {
- schedulePublish(0);
- }
-
- protected void schedulePublish(long delay) {
- if (isRunning() && executorQueued.compareAndSet(false, true)) {
- long now = System.currentTimeMillis();
- delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
- if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
-
- Runnable job = new Runnable() {
- @Override public void run() {
- try {
- executorTime = System.currentTimeMillis();
- executorQueued.set(false);
- publishNow();
-
- } catch (Exception e) {
- if (isRunning()) {
- LOG.error("Problem resizing: "+e, e);
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e);
- }
- } catch (Throwable t) {
- LOG.error("Problem in service-failure-detector: "+t, t);
- throw Exceptions.propagate(t);
- }
- }
- };
-
- ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
- ((EntityInternal)entity).getExecutionContext().submit(task);
- }
- }
-
- private synchronized void publishNow() {
- if (!isRunning()) return;
-
- CalculatedStatus calculatedStatus = calculateStatus();
- boolean connected = calculatedStatus.connected;
-
- Long lastUpTime = connectionLastUp.get();
- Long lastDownTime = connectionLastDown.get();
- long serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay().toMilliseconds();
- long serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay().toMilliseconds();
- long now = System.currentTimeMillis();
-
- if (connected) {
- if (lastPublished == LastPublished.FAILED) {
- // only publish if consistently up for serviceRecoveredStabilizationDelay
- long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime);
- long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
- if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
- String description = calculatedStatus.getDescription();
- LOG.warn("{} connectivity-check for {}, publishing recovered: {}", new Object[] {this, entity, description});
- entity.emit(CONNECTION_RECOVERED, new HASensors.FailureDescriptor(entity, description));
- lastPublished = LastPublished.RECOVERED;
- currentFailureStartTime = null;
- } else {
- long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
- schedulePublish(nextAttemptTime);
- }
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Sensor<FailureDescriptor> getSensorRecovered() {
+ Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
+ if (sensorRecovered.isPresent()) {
+ return (Sensor<FailureDescriptor>)sensorRecovered.get();
} else {
- if (lastPublished != LastPublished.FAILED) {
- // only publish if consistently down for serviceFailedStabilizationDelay
- long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime);
- long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
- if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) {
- String description = calculatedStatus.getDescription();
- LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description});
- entity.emit(CONNECTION_FAILED, new HASensors.FailureDescriptor(entity, description));
- lastPublished = LastPublished.FAILED;
- currentRecoveryStartTime = null;
- } else {
- long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod);
- schedulePublish(nextAttemptTime);
- }
- }
+ return CONNECTION_RECOVERED;
}
}
- public class CalculatedStatus {
- public final boolean connected;
-
- public CalculatedStatus() {
- HostAndPort endpoint = getConfig(ENDPOINT);
- connected = Networking.isReachable(endpoint);
- }
-
- public String getDescription() {
- Long lastUpTime = connectionLastUp.get();
- Long lastDownTime = connectionLastDown.get();
- Duration serviceFailedStabilizationDelay = getConnectionFailedStabilizationDelay();
- Duration serviceRecoveredStabilizationDelay = getConnectionRecoveredStabilizationDelay();
-
- return String.format("endpoint=%s; connected=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+
- "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
- getConfig(ENDPOINT),
- connected,
- Time.makeDateString(System.currentTimeMillis()),
- (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"),
- (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"),
- lastPublished,
- (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
- (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
- }
- }
-
- private long getTimeDiff(Long recent, Long previous) {
- return (previous == null) ? recent : (recent - previous);
- }
-
- private String getTimeStringSince(Long time) {
- return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/dd0c2348/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
new file mode 100644
index 0000000..435592e
--- /dev/null
+++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.policy.ha;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicNotificationSensor;
+import brooklyn.location.basic.Machines;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.policy.ha.AbstractFailureDetector.LastPublished;
+import brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.internal.ssh.SshTool;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, "
+ + "emitting an event if the connection is lost/restored")
+public class SshMachineFailureDetector extends AbstractFailureDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
+
+ public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
+
+ public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
+
+ public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey(
+ "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS);
+
+ @Override
+ public void init() {
+ super.init();
+ if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+ config().set(SENSOR_FAILED, CONNECTION_FAILED);
+ }
+ if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+ config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
+ }
+ if (config().getRaw(POLL_PERIOD).isAbsent()) {
+ config().set(POLL_PERIOD, Duration.ONE_MINUTE);
+ }
+ }
+
+ @Override
+ protected CalculatedStatus calculateStatus() {
+ Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations());
+ if (sshMachineOption.isPresent()) {
+ SshMachineLocation sshMachine = sshMachineOption.get();
+ try {
+ Duration timeout = config().get(CONNECT_TIMEOUT);
+ Map<String, ?> flags = ImmutableMap.of(
+ SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(),
+ SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(),
+ SshTool.PROP_SSH_TRIES.getName(), 1);
+ int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
+ return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString());
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null;
+ if (isFirstFailure) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed connecting to machine " + sshMachine, e);
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed connecting to machine " + sshMachine, e);
+ }
+ }
+ return new BasicCalculatedStatus(false, e.getMessage());
+ }
+ } else {
+ return new BasicCalculatedStatus(true, "no machine started, not complaining");
+ }
+ }
+}
[2/4] incubator-brooklyn git commit: Add option to
ServiceFailureDetector to republish failed events
Posted by al...@apache.org.
Add option to ServiceFailureDetector to republish failed events
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/75f00025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/75f00025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/75f00025
Branch: refs/heads/master
Commit: 75f00025980c240c8bd37c5ed1b74198bda76d99
Parents: dd0c234
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Fri Mar 27 19:10:29 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:11 2015 +0300
----------------------------------------------------------------------
.../policy/ha/ServiceFailureDetector.java | 14 +++-
.../policy/ha/ServiceFailureDetectorTest.java | 71 +++++++++++++++++++-
2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
index f3264e3..7f5c142 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -102,6 +102,12 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
.defaultValue(Duration.ZERO)
.build();
+ @SetFromFlag("entityFailedRepublishTime")
+ public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
+ .name("entityFailed.republishTime")
+ .description("Publish failed state periodically at the specified intervals, null to disable.")
+ .build();
+
protected Long firstUpTime;
protected Long currentFailureStartTime = null;
@@ -215,7 +221,13 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
if (delayBeforeCheck<=0) {
if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}",
new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
- publishEntityFailedTime = null;
+ Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
+ if (republishDelay == null) {
+ publishEntityFailedTime = null;
+ } else {
+ publishEntityFailedTime = now + republishDelay.toMilliseconds();
+ recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
+ }
lastPublished = LastPublished.FAILED;
entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
index c31bbda..7250e8e 100644
--- a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
+++ b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -24,7 +24,10 @@ import static org.testng.Assert.fail;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -56,6 +59,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
public class ServiceFailureDetectorTest {
+ private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
private static final int TIMEOUT_MS = 10*1000;
@@ -245,7 +249,7 @@ public class ServiceFailureDetectorTest {
.configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
.configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
- // Set the entit to healthy
+ // Set the entity to healthy
e1.setAttribute(TestEntity.SERVICE_UP, true);
ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
@@ -312,6 +316,71 @@ public class ServiceFailureDetectorTest {
assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
}
+ @Test(groups="Integration") // Has a 1.5 second wait
+ public void testRepublishedFailure() throws Exception {
+ Duration republishPeriod = Duration.millis(100);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+
+ // Set the entity to healthy
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ // Make the entity fail;
+ ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+ //wait for at least 10 republish events (~1 sec)
+ assertEventsSizeEventually(10);
+
+ // Now recover
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+ //once recovered check no more failed events emitted periodically
+ assertEventsSizeContiniually(events.size());
+
+ SensorEvent<FailureDescriptor> prevEvent = null;
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (prevEvent != null) {
+ long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+ long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+ if (deviation > republishPeriod.toMilliseconds()/10 &&
+ //warn only if recovered is too far away from the last failure
+ (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+ repeatOffset > republishPeriod.toMilliseconds())) {
+ log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+ }
+ }
+ prevEvent = event;
+ }
+
+ //make sure no republish takes place after recovered
+ assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+ }
+
+ private void assertEventsSizeContiniually(final int size) {
+ Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
+ private void assertEventsSizeEventually(final int size) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
for (SensorEvent<FailureDescriptor> event : events) {
if (event.getSensor().equals(sensor) &&
[3/4] incubator-brooklyn git commit: Add uniqueTag for policies
Posted by al...@apache.org.
Add uniqueTag for policies
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/68464399
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/68464399
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/68464399
Branch: refs/heads/master
Commit: 68464399e2ee455e2d8beff37330f1e6cedd575e
Parents: 75f0002
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Tue Mar 31 14:20:18 2015 +0300
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:18 2015 +0300
----------------------------------------------------------------------
.../src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java | 2 ++
policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java | 2 ++
.../main/java/brooklyn/policy/ha/SshMachineFailureDetector.java | 2 ++
3 files changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
index 6e37ef9..cc2ab3a 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -30,6 +30,7 @@ import brooklyn.event.SensorEventListener;
import brooklyn.policy.Policy;
import brooklyn.policy.basic.AbstractPolicy;
import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.javalang.JavaClassNames;
import com.google.common.base.Preconditions;
@@ -57,6 +58,7 @@ public class ConditionalSuspendPolicy extends AbstractPolicy {
Preconditions.checkNotNull(target, "Suspend target required");
Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target);
subscribe();
+ uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName();
}
private void subscribe() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
index 3a0b6d4..b71d6d6 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
@@ -43,6 +43,7 @@ import brooklyn.policy.ha.HASensors.FailureDescriptor;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.config.ConfigBag;
import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.javalang.JavaClassNames;
import brooklyn.util.time.Duration;
import brooklyn.util.time.Time;
@@ -91,6 +92,7 @@ public class ServiceRestarter extends AbstractPolicy {
public ServiceRestarter(ConfigBag configBag) {
// TODO hierarchy should use ConfigBag, and not change flags
super(configBag.getAllConfigMutable());
+ uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName();
}
public ServiceRestarter(Sensor<?> failureSensorToMonitor) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/68464399/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
index 435592e..ee1c6ee 100644
--- a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
@@ -43,6 +43,7 @@ import com.google.common.collect.ImmutableMap;
+ "emitting an event if the connection is lost/restored")
public class SshMachineFailureDetector extends AbstractFailureDetector {
private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
+ public static final String DEFAULT_UNIQUE_TAG = "failureDetector.sshMachine.tag";
public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
@@ -63,6 +64,7 @@ public class SshMachineFailureDetector extends AbstractFailureDetector {
if (config().getRaw(POLL_PERIOD).isAbsent()) {
config().set(POLL_PERIOD, Duration.ONE_MINUTE);
}
+ uniqueTag = DEFAULT_UNIQUE_TAG;
}
@Override
[4/4] incubator-brooklyn git commit: This closes #571
Posted by al...@apache.org.
This closes #571
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/04fc801d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/04fc801d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/04fc801d
Branch: refs/heads/master
Commit: 04fc801d005ac6bf8e852dfa74b571b5c34cd693
Parents: 6153b0a 6846439
Author: Aled Sage <al...@gmail.com>
Authored: Sun Apr 12 19:50:36 2015 -0500
Committer: Aled Sage <al...@gmail.com>
Committed: Sun Apr 12 19:50:36 2015 -0500
----------------------------------------------------------------------
.../policy/ha/AbstractFailureDetector.java | 359 +++++++++++++++++++
.../policy/ha/ConditionalSuspendPolicy.java | 103 ++++++
.../policy/ha/ConnectionFailureDetector.java | 290 ++-------------
.../policy/ha/ServiceFailureDetector.java | 14 +-
.../brooklyn/policy/ha/ServiceRestarter.java | 2 +
.../policy/ha/SshMachineFailureDetector.java | 101 ++++++
.../policy/ha/ServiceFailureDetectorTest.java | 71 +++-
7 files changed, 686 insertions(+), 254 deletions(-)
----------------------------------------------------------------------