You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/04/13 02:51:22 UTC
[2/4] incubator-brooklyn git commit: Add option to
ServiceFailureDetector to republish failed events
Add option to ServiceFailureDetector to republish failed events
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/75f00025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/75f00025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/75f00025
Branch: refs/heads/master
Commit: 75f00025980c240c8bd37c5ed1b74198bda76d99
Parents: dd0c234
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Fri Mar 27 19:10:29 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:11 2015 +0300
----------------------------------------------------------------------
.../policy/ha/ServiceFailureDetector.java | 14 +++-
.../policy/ha/ServiceFailureDetectorTest.java | 71 +++++++++++++++++++-
2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
index f3264e3..7f5c142 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -102,6 +102,12 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
.defaultValue(Duration.ZERO)
.build();
+ @SetFromFlag("entityFailedRepublishTime")
+ public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
+ .name("entityFailed.republishTime")
+ .description("Publish failed state periodically at the specified intervals, null to disable.")
+ .build();
+
protected Long firstUpTime;
protected Long currentFailureStartTime = null;
@@ -215,7 +221,13 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
if (delayBeforeCheck<=0) {
if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}",
new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
- publishEntityFailedTime = null;
+ Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
+ if (republishDelay == null) {
+ publishEntityFailedTime = null;
+ } else {
+ publishEntityFailedTime = now + republishDelay.toMilliseconds();
+ recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
+ }
lastPublished = LastPublished.FAILED;
entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
index c31bbda..7250e8e 100644
--- a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
+++ b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -24,7 +24,10 @@ import static org.testng.Assert.fail;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -56,6 +59,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
public class ServiceFailureDetectorTest {
+ private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
private static final int TIMEOUT_MS = 10*1000;
@@ -245,7 +249,7 @@ public class ServiceFailureDetectorTest {
.configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
.configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
- // Set the entit to healthy
+ // Set the entity to healthy
e1.setAttribute(TestEntity.SERVICE_UP, true);
ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
@@ -312,6 +316,71 @@ public class ServiceFailureDetectorTest {
assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
}
+ @Test(groups="Integration") // Has a 1.5 second wait
+ public void testRepublishedFailure() throws Exception {
+ Duration republishPeriod = Duration.millis(100);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+
+ // Set the entity to healthy
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ // Make the entity fail;
+ ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+ //wait for at least 10 republish events (~1 sec)
+ assertEventsSizeEventually(10);
+
+ // Now recover
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+ //once recovered check no more failed events emitted periodically
+ assertEventsSizeContiniually(events.size());
+
+ SensorEvent<FailureDescriptor> prevEvent = null;
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (prevEvent != null) {
+ long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+ long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+ if (deviation > republishPeriod.toMilliseconds()/10 &&
+ //warn only if recovered is too far away from the last failure
+ (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+ repeatOffset > republishPeriod.toMilliseconds())) {
+ log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+ }
+ }
+ prevEvent = event;
+ }
+
+ //make sure no republish takes place after recovered
+ assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+ }
+
+ private void assertEventsSizeContiniually(final int size) {
+ Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
+ private void assertEventsSizeEventually(final int size) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
for (SensorEvent<FailureDescriptor> event : events) {
if (event.getSensor().equals(sensor) &&