You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/04/13 02:51:22 UTC

[2/4] incubator-brooklyn git commit: Add option to ServiceFailureDetector to republish failed events

Add option to ServiceFailureDetector to republish failed events


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/75f00025
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/75f00025
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/75f00025

Branch: refs/heads/master
Commit: 75f00025980c240c8bd37c5ed1b74198bda76d99
Parents: dd0c234
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Fri Mar 27 19:10:29 2015 +0200
Committer: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Committed: Tue Apr 7 14:38:11 2015 +0300

----------------------------------------------------------------------
 .../policy/ha/ServiceFailureDetector.java       | 14 +++-
 .../policy/ha/ServiceFailureDetectorTest.java   | 71 +++++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
index f3264e3..7f5c142 100644
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
@@ -102,6 +102,12 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
             .defaultValue(Duration.ZERO)
             .build();
 
+    @SetFromFlag("entityFailedRepublishTime")
+    public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
+            .name("entityFailed.republishTime")
+            .description("Publish failed state periodically at the specified intervals, null to disable.")
+            .build();
+
     protected Long firstUpTime;
     
     protected Long currentFailureStartTime = null;
@@ -215,7 +221,13 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat
                 if (delayBeforeCheck<=0) {
                     if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}", 
                             new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
-                    publishEntityFailedTime = null;
+                    Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
+                    if (republishDelay == null) {
+                        publishEntityFailedTime = null;
+                    } else {
+                        publishEntityFailedTime = now + republishDelay.toMilliseconds();
+                        recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
+                    }
                     lastPublished = LastPublished.FAILED;
                     entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/75f00025/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
index c31bbda..7250e8e 100644
--- a/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
+++ b/policy/src/test/java/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -24,7 +24,10 @@ import static org.testng.Assert.fail;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -56,6 +59,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableMap;
 
 public class ServiceFailureDetectorTest {
+    private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
 
     private static final int TIMEOUT_MS = 10*1000;
 
@@ -245,7 +249,7 @@ public class ServiceFailureDetectorTest {
             .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
             .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
         
-        // Set the entit to healthy
+        // Set the entity to healthy
         e1.setAttribute(TestEntity.SERVICE_UP, true);
         ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
         EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
@@ -312,6 +316,71 @@ public class ServiceFailureDetectorTest {
         assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
     }
     
+    @Test(groups="Integration") // Has a 1.5 second wait
+    public void testRepublishedFailure() throws Exception {
+        Duration republishPeriod = Duration.millis(100);
+
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+            
+        // Set the entity to healthy
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        // Make the entity fail;
+        ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+        //wait for at least 10 republish events (~1 sec)
+        assertEventsSizeEventually(10);
+
+        // Now recover
+        ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+        //once recovered check no more failed events emitted periodically
+        assertEventsSizeContiniually(events.size());
+
+        SensorEvent<FailureDescriptor> prevEvent = null;
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (prevEvent != null) {
+                long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+                long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+                if (deviation > republishPeriod.toMilliseconds()/10 &&
+                        //warn only if recovered is too far away from the last failure
+                        (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+                        repeatOffset > republishPeriod.toMilliseconds())) {
+                    log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+                }
+            }
+            prevEvent = event;
+        }
+        
+        //make sure no republish takes place after recovered
+        assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+    }
+    
+    private void assertEventsSizeContiniually(final int size) {
+        Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
+    private void assertEventsSizeEventually(final int size) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
     private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
         for (SensorEvent<FailureDescriptor> event : events) {
             if (event.getSensor().equals(sensor) &&