You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:05 UTC

[07/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming package policy

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
new file mode 100644
index 0000000..e2d6998
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+
+public class ConnectionFailureDetectorTest {
+
+    private static final int TIMEOUT_MS = 30*1000;
+    private static final int OVERHEAD = 250;
+    private static final int POLL_PERIOD = 100;
+
+    private ManagementContext managementContext;
+    private TestApplication app;
+    
+    private List<SensorEvent<FailureDescriptor>> events;
+    
+    private ServerSocket serverSocket;
+    private HostAndPort serverSocketAddress;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+        
+        managementContext = new LocalManagementContextForTests();
+        app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+        
+        app.getManagementContext().getSubscriptionManager().subscribe(
+                app, 
+                HASensors.CONNECTION_FAILED, 
+                new SensorEventListener<FailureDescriptor>() {
+                    @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                        events.add(event);
+                    }
+                });
+        app.getManagementContext().getSubscriptionManager().subscribe(
+                app, 
+                HASensors.CONNECTION_RECOVERED, 
+                new SensorEventListener<FailureDescriptor>() {
+                    @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                        events.add(event);
+                    }
+                });
+        
+        serverSocketAddress = startServerSocket();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        stopServerSocket();
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    private HostAndPort startServerSocket() throws Exception {
+        if (serverSocketAddress != null) {
+            serverSocket = new ServerSocket(serverSocketAddress.getPort());
+        } else {
+            for (int i = 40000; i < 40100; i++) {
+                try {
+                    serverSocket = new ServerSocket(i);
+                } catch (IOException e) {
+                    // try next port
+                }
+            }
+            assertNotNull(serverSocket, "Failed to create server socket; no ports free in range!");
+            serverSocketAddress = HostAndPort.fromParts(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort());
+        }
+        return serverSocketAddress;
+    }
+
+    private void stopServerSocket() throws Exception {
+        if (serverSocket != null) serverSocket.close();
+    }
+
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testNotNotifiedOfFailuresForHealthy() throws Exception {
+        // Create members before and after the policy is registered, to test both scenarios
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+        
+        assertNoEventsContinually();
+    }
+    
+    @Test
+    public void testNotifiedOfFailure() throws Exception {
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+        stopServerSocket();
+
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+        assertEquals(events.size(), 1, "events="+events);
+    }
+    
+    @Test
+    public void testNotifiedOfRecovery() throws Exception {
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+        
+        stopServerSocket();
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+
+        // make the connection recover
+        startServerSocket();
+        assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+        assertEquals(events.size(), 2, "events="+events);
+    }
+    
+    @Test
+    public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception {
+        stopServerSocket();
+
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+    }
+
+    @Test(groups="Integration") // Because slow
+    public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception {
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE));
+        
+        stopServerSocket();
+        Thread.sleep(100);
+        startServerSocket();
+
+        assertNoEventsContinually();
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception {
+        final int stabilisationDelay = 1000;
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        stopServerSocket();
+
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testFailuresThenUpDownResetsStabilisationCount() throws Exception {
+        final long stabilisationDelay = 1000;
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        stopServerSocket();
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+        startServerSocket();
+        Thread.sleep(POLL_PERIOD+OVERHEAD);
+        stopServerSocket();
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception {
+        final long stabilisationDelay = 1000;
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        stopServerSocket();
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+        events.clear();
+        
+        startServerSocket();
+        Thread.sleep(POLL_PERIOD+OVERHEAD);
+        stopServerSocket();
+
+        assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD));
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception {
+        final int stabilisationDelay = 1000;
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        stopServerSocket();
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+        events.clear();
+
+        startServerSocket();
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testRecoversThenDownUpResetsStabilisationCount() throws Exception {
+        final long stabilisationDelay = 1000;
+        
+        app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+                .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+                .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        stopServerSocket();
+        assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+        events.clear();
+        
+        startServerSocket();
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        
+        stopServerSocket();
+        Thread.sleep(POLL_PERIOD+OVERHEAD);
+        startServerSocket();
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+        assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+    }
+
+    private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (event.getSensor().equals(sensor) && 
+                    (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+                    (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+                return;
+            }
+        }
+        fail("No matching "+sensor+" event found; events="+events);
+    }
+    
+    private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override public void run() {
+                assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+            }});
+    }
+    
+    private void assertNoEventsContinually(Duration duration) {
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() {
+            @Override public void run() {
+                assertTrue(events.isEmpty(), "events="+events);
+            }});
+    }
+    
+    private void assertNoEventsContinually() {
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(events.isEmpty(), "events="+events);
+            }});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
new file mode 100644
index 0000000..3782ccf
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.rebind.RebindTestFixtureWithApp;
+
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class HaPolicyRebindTest extends RebindTestFixtureWithApp {
+
+    private TestEntity origEntity;
+    private SensorEventListener<FailureDescriptor> eventListener;
+    private List<SensorEvent<FailureDescriptor>> events;
+    
+    @Override
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        super.setUp();
+        origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class));
+        events = Lists.newCopyOnWriteArrayList();
+        eventListener = new SensorEventListener<FailureDescriptor>() {
+            @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                events.add(event);
+            }
+        };
+    }
+
+    @Test
+    public void testServiceRestarterWorksAfterRebind() throws Exception {
+        origEntity.addPolicy(PolicySpec.create(ServiceRestarter.class)
+                .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        
+        TestApplication newApp = rebind();
+        final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+        
+        newEntity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(origEntity, "simulate failure"));
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(newEntity.getCallHistory(), ImmutableList.of("restart"));
+            }});
+    }
+
+    @Test
+    public void testServiceReplacerWorksAfterRebind() throws Exception {
+        Location origLoc = origManagementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class));
+        DynamicCluster origCluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+        origApp.start(ImmutableList.<Location>of(origLoc));
+
+        origCluster.addPolicy(PolicySpec.create(ServiceReplacer.class)
+                .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+
+        // rebind
+        TestApplication newApp = rebind();
+        final DynamicCluster newCluster = (DynamicCluster) Iterables.find(newApp.getChildren(), Predicates.instanceOf(DynamicCluster.class));
+
+        // stimulate the policy
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(newCluster.getMembers());
+        final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1);
+        
+        newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener);
+        newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+        
+        // Expect e1 to be replaced
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(newCluster.getMembers()), initialMembers);
+                Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(newCluster.getMembers()));
+                assertEquals(removedMembers, ImmutableSet.of(e1));
+                assertEquals(newMembers.size(), 1);
+                assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+                
+                // TODO e1 not reporting "start" after rebind because callHistory is a field rather than an attribute, so was not persisted
+                Asserts.assertEqualsIgnoringOrder(e1.getCallHistory(), ImmutableList.of("stop"));
+                assertFalse(Entities.isManaged(e1));
+            }});
+    }
+    
+    @Test
+    public void testServiceFailureDetectorWorksAfterRebind() throws Exception {
+        origEntity.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+        // rebind
+        TestApplication newApp = rebind();
+        final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+
+        newApp.getManagementContext().getSubscriptionManager().subscribe(newEntity, HASensors.ENTITY_FAILED, eventListener);
+
+        newEntity.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(newEntity, Lifecycle.RUNNING);
+        
+        // trigger the failure
+        newEntity.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(newEntity), null);
+        assertEquals(events.size(), 1, "events="+events);
+    }
+    
+    private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override public void run() {
+                assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+            }});
+    }
+    
+    private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (event.getSensor().equals(sensor) && 
+                    (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+                    (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+                return;
+            }
+        }
+        fail("No matching "+sensor+" event found; events="+events);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
new file mode 100644
index 0000000..ca84592
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogicTest;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+
+/** also see more primitive tests in {@link ServiceStateLogicTest} */
+public class ServiceFailureDetectorStabilizationTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetectorStabilizationTest.class);
+
+    private static final int TIMEOUT_MS = 10*1000;
+    private static final int OVERHEAD = 250;
+
+    private ManagementContext managementContext;
+    private TestApplication app;
+    private TestEntity e1;
+    
+    private List<SensorEvent<FailureDescriptor>> events;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+        
+        managementContext = new LocalManagementContextForTests();
+        app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+        e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        
+        app.getManagementContext().getSubscriptionManager().subscribe(
+                e1, 
+                HASensors.ENTITY_FAILED, 
+                new SensorEventListener<FailureDescriptor>() {
+                    @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                        events.add(event);
+                    }
+                });
+        app.getManagementContext().getSubscriptionManager().subscribe(
+                e1, 
+                HASensors.ENTITY_RECOVERED, 
+                new SensorEventListener<FailureDescriptor>() {
+                    @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                        events.add(event);
+                    }
+                });
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        Thread.sleep(100);
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+
+        assertNoEventsContinually();
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception {
+        final int stabilisationDelay = 1000;
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testFailuresThenUpDownResetsStabilisationCount() throws Exception {
+        LOG.debug("Running testFailuresThenUpDownResetsStabilisationCount");
+        final long stabilisationDelay = 1000;
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        Thread.sleep(OVERHEAD);
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception {
+        final long stabilisationDelay = 1000;
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        events.clear();
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        Thread.sleep(100);
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD));
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception {
+        final int stabilisationDelay = 1000;
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        events.clear();
+
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test(groups="Integration") // Because slow
+    public void testRecoversThenDownUpResetsStabilisationCount() throws Exception {
+        final long stabilisationDelay = 1000;
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        events.clear();
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        Thread.sleep(OVERHEAD);
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+    }
+
+    private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (event.getSensor().equals(sensor) && 
+                    (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+                    (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+                return;
+            }
+        }
+        fail("No matching "+sensor+" event found; events="+events);
+    }
+    
+    private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override public void run() {
+                assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+            }});
+    }
+
+    private void assertNoEventsContinually(Duration duration) {
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() {
+            @Override public void run() {
+                assertTrue(events.isEmpty(), "events="+events);
+            }});
+    }
+    
+    private void assertNoEventsContinually() {
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(events.isEmpty(), "events="+events);
+            }});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
new file mode 100644
index 0000000..2ad2d79
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+
+public class ServiceFailureDetectorTest {
+    private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
+
+    private static final int TIMEOUT_MS = 10*1000;
+
+    private ManagementContext managementContext;
+    private TestApplication app;
+    private TestEntity e1;
+    
+    private List<SensorEvent<FailureDescriptor>> events;
+    private SensorEventListener<FailureDescriptor> eventListener;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+        eventListener = new SensorEventListener<FailureDescriptor>() {
+            @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+                events.add(event);
+            }
+        };
+        
+        managementContext = new LocalManagementContextForTests();
+        app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+        e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        e1.addEnricher(ServiceStateLogic.newEnricherForServiceStateFromProblemsAndUp());
+        
+        app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener);
+        app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener);
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testNotNotifiedOfFailuresForHealthy() throws Exception {
+        // Create members before and after the policy is registered, to test both scenarios
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        assertNoEventsContinually();
+        assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+    }
+    
+    @Test
+    public void testNotifiedOfFailure() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        
+        assertEquals(events.size(), 0, "events="+events);
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 1, "events="+events);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+    }
+    
+    @Test
+    public void testNotifiedOfFailureOnProblem() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        
+        assertEquals(events.size(), 0, "events="+events);
+        
+        ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 1, "events="+events);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+    }
+    
+    @Test
+    public void testNotifiedOfFailureOnStateOnFire() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE);
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 1, "events="+events);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+    }
+    
+    @Test
+    public void testNotifiedOfRecovery() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        // Make the entity fail
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+        // And make the entity recover
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 2, "events="+events);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+    
+    @Test
+    public void testNotifiedOfRecoveryFromProblems() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        // Make the entity fail
+        ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+        // And make the entity recover
+        ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 2, "events="+events);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+    
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testEmitsEntityFailureOnlyIfPreviouslyUp() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        // Make the entity fail
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        assertNoEventsContinually();
+    }
+    
+    @Test
+    public void testDisablingPreviouslyUpRequirementForEntityFailed() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test
+    public void testDisablingOnFire() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.PRACTICALLY_FOREVER));
+        
+        // Make the entity fail
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+    }
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testOnFireAfterDelay() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND));
+        
+        // Make the entity fail
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+        Time.sleep(Duration.millis(100));
+        assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+    }
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testOnFailureDelayFromProblemAndRecover() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
+            .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
+        
+        // Set the entity to healthy
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        // Make the entity fail; won't set on-fire for 1s but will publish FAILED immediately.
+        ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+        EntityTestUtils.assertAttributeEqualsContinually(ImmutableMap.of("timeout", 100), e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        
+        // Now recover: will publish RUNNING immediately, but has 1s stabilisation for RECOVERED
+        ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        assertEquals(events.size(), 1, "events="+events);
+        
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+        assertEquals(events.size(), 2, "events="+events);
+    }
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testAttendsToServiceState() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        // not counted as failed because not expected to be running
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertNoEventsContinually();
+    }
+
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testOnlyReportsFailureIfRunning() throws Exception {
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+        
+        // Make the entity fail
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.STARTING);
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        assertNoEventsContinually();
+    }
+    
+    @Test
+    public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception {
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test
+    public void testReportsFailureWhenAlreadyOnFireOnRegisteringPolicy() throws Exception {
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE);
+
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+            .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+    }
+    
+    @Test(groups="Integration") // Has a 1.5 second wait
+    public void testRepublishedFailure() throws Exception {
+        Duration republishPeriod = Duration.millis(100);
+
+        e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+                .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+            
+        // Set the entity to healthy
+        e1.setAttribute(TestEntity.SERVICE_UP, true);
+        ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+        EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        // Make the entity fail;
+        ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+        assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+        //wait for at least 10 republish events (~1 sec)
+        assertEventsSizeEventually(10);
+
+        // Now recover
+        ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+        EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+        //once recovered check no more failed events emitted periodically
+        assertEventsSizeContiniually(events.size());
+
+        SensorEvent<FailureDescriptor> prevEvent = null;
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (prevEvent != null) {
+                long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+                long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+                if (deviation > republishPeriod.toMilliseconds()/10 &&
+                        //warn only if recovered is too far away from the last failure
+                        (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+                        repeatOffset > republishPeriod.toMilliseconds())) {
+                    log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+                }
+            }
+            prevEvent = event;
+        }
+        
+        //make sure no republish takes place after recovered
+        assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+    }
+    
+    private void assertEventsSizeContiniually(final int size) {
+        Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
+    private void assertEventsSizeEventually(final int size) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override
+            public void run() {
+                assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+            }
+        });
+    }
+
+    private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+        for (SensorEvent<FailureDescriptor> event : events) {
+            if (event.getSensor().equals(sensor) && 
+                    (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+                    (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+                return;
+            }
+        }
+        fail("No matching "+sensor+" event found; events="+events);
+    }
+    
+    private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+        Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+            @Override public void run() {
+                assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+            }});
+    }
+    
+    private void assertNoEventsContinually() {
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(events.isEmpty(), "events="+events);
+            }});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
new file mode 100644
index 0000000..f92603a
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.QuorumCheck;
+import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.trait.FailingEntity;
+
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.javalang.JavaClassNames;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ServiceReplacerTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ServiceReplacerTest.class);
+    
+    private ManagementContext managementContext;
+    private TestApplication app;
+    private SimulatedLocation loc;
+    private SensorEventListener<Object> eventListener;
+    private List<SensorEvent<?>> events;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        managementContext = new LocalManagementContextForTests();
+        app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+        loc = managementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class));
+        events = Lists.newCopyOnWriteArrayList();
+        eventListener = new SensorEventListener<Object>() {
+            @Override public void onEvent(SensorEvent<Object> event) {
+                events.add(event);
+            }
+        };
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    @Test
+    public void testReplacesFailedMember() throws Exception {
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+                .configure(DynamicCluster.INITIAL_SIZE, 3));
+        app.start(ImmutableList.<Location>of(loc));
+
+        ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        cluster.addPolicy(policy);
+
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+        final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+        
+        // Expect e1 to be replaced
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers);
+                Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers()));
+                assertEquals(removedMembers, ImmutableSet.of(e1));
+                assertEquals(newMembers.size(), 1);
+                assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+                assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop"));
+                assertFalse(Entities.isManaged(e1));
+            }});
+    }
+
+    @Test(invocationCount=100)
+    public void testSetsOnFireWhenFailToReplaceMemberManyTimes() throws Exception {
+        testSetsOnFireWhenFailToReplaceMember();
+    }
+    
+    // fails the startup of the replacement entity (but not the original). 
+    @Test
+    public void testSetsOnFireWhenFailToReplaceMember() throws Exception {
+        app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+        
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                        .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2)))
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true)
+                .configure(ComputeServiceIndicatorsFromChildrenAndMembers.UP_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue())
+                .configure(ComputeServiceIndicatorsFromChildrenAndMembers.RUNNING_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue()));
+        app.start(ImmutableList.<Location>of(loc));
+        
+        // should not be on fire
+        Assert.assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+        // and should eventually be running
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+        
+        log.info("started "+app+" for "+JavaClassNames.niceClassAndMethod());
+        
+        ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        cluster.addPolicy(policy);
+        
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+        final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+        // Expect cluster to go on-fire when fails to start replacement
+        // Note that we've set up-quorum and running-quorum to be "alwaysTrue" so that we don't get a transient onFire
+        // when the failed node fails to start (but before it has been removed from the group to be put in quarantine).
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+        // Expect to have the second failed entity still kicking around as proof (in quarantine)
+        // The cluster should NOT go on fire until after the 2nd failure
+        Iterable<Entity> members = Iterables.filter(managementContext.getEntityManager().getEntities(), Predicates.instanceOf(FailingEntity.class));
+        assertEquals(Iterables.size(members), 2);
+
+        // e2 failed to start, so it won't have called stop on e1
+        TestEntity e2 = (TestEntity) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(members), initialMembers));
+        assertEquals(e1.getCallHistory(), ImmutableList.of("start"), "e1.history="+e1.getCallHistory());
+        assertEquals(e2.getCallHistory(), ImmutableList.of("start"), "e2.history="+e2.getCallHistory());
+
+        // And will have received notification event about it
+        assertEventuallyHasEntityReplacementFailedEvent(cluster);
+    }
+    
+    @Test(groups="Integration") // has a 1 second wait
+    public void testDoesNotOnFireWhenFailToReplaceMember() throws Exception {
+        app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+        
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                        .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2)))
+                .configure(DynamicCluster.INITIAL_SIZE, 1)
+                .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true));
+        app.start(ImmutableList.<Location>of(loc));
+        
+        ServiceReplacer policy = new ServiceReplacer(new ConfigBag()
+                .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+                .configure(ServiceReplacer.SET_ON_FIRE_ON_FAILURE, false));
+        cluster.addPolicy(policy);
+        
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+        final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+        // Configured to not mark cluster as on fire
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+            }});
+        
+        // And will have received notification event about it
+        assertEventuallyHasEntityReplacementFailedEvent(cluster);
+    }
+
+    @Test(groups="Integration")  // 1s wait
+    public void testStopFailureOfOldEntityDoesNotSetClusterOnFire() throws Exception {
+        app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+        
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                        .configure(FailingEntity.FAIL_ON_STOP_CONDITION, predicateOnlyTrueForCallAt(1)))
+                .configure(DynamicCluster.INITIAL_SIZE, 2));
+        app.start(ImmutableList.<Location>of(loc));
+        
+        cluster.addPolicy(PolicySpec.create(ServiceReplacer.class)
+                .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+        final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+        // Expect e1 to be replaced
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers);
+                Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers()));
+                assertEquals(removedMembers, ImmutableSet.of(e1));
+                assertEquals(newMembers.size(), 1);
+                assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+                assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop"));
+                assertFalse(Entities.isManaged(e1));
+            }});
+
+        // Failure to stop the failed member should not cause "on-fire" of cluster
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+            }});
+    }
+
+    /**
+     * If we keep on getting failure reports, never managing to replace the failed node, then don't keep trying
+     * (i.e. avoid infinite loop).
+     * 
+     * TODO This code + configuration needs some work; it's not testing quite the scenarios that I
+     * was thinking of!
+     * I saw problem where a node failed, and the replacements failed, and we ended up trying thousands of times.
+     * (describing this scenario is made more complex by me having temporarily disabled the cluster from 
+     * removing failed members, for debugging purposes!)
+     * Imagine these two scenarios:
+     * <ol>
+     *   <li>Entity fails during call to start().
+     *       Here, the cluster removes it as a member (either unmanages it or puts it in quarantine)
+     *       So the ENTITY_FAILED is ignored because the entity is not a member at that point.
+     *   <li>Entity returns from start(), but quickly goes to service-down.
+     *       Here we'll keep trying to replace that entity. Depending how long that takes, we'll either 
+     *       enter a horrible infinite loop, or we'll just provision a huge number of VMs over a long 
+     *       time period.
+     *       Unfortunately this scenario is not catered for in the code yet.
+     * </ol>
+     */
+    @Test(groups="Integration") // because takes 1.2 seconds
+    public void testAbandonsReplacementAfterNumFailures() throws Exception {
+        app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+        
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                        .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(11)))
+                .configure(DynamicCluster.INITIAL_SIZE, 10)
+                .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true));
+        app.start(ImmutableList.<Location>of(loc));
+        
+        ServiceReplacer policy = new ServiceReplacer(new ConfigBag()
+                .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+                .configure(ServiceReplacer.FAIL_ON_NUM_RECURRING_FAILURES, 3));
+        cluster.addPolicy(policy);
+
+        final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+        for (int i = 0; i < 5; i++) {
+            final int counter = i+1;
+            EntityInternal entity = (EntityInternal) Iterables.get(initialMembers, i);
+            entity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(entity, "simulate failure"));
+            if (i <= 3) {
+                Asserts.succeedsEventually(new Runnable() {
+                    @Override public void run() {
+                        Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class));
+                        Set<FailingEntity> replacements = Sets.difference(all, initialMembers);
+                        Set<?> replacementMembers = Sets.intersection(ImmutableSet.of(cluster.getMembers()), replacements);
+                        assertTrue(replacementMembers.isEmpty());
+                        assertEquals(replacements.size(), counter);
+                    }});
+            } else {
+                Asserts.succeedsContinually(new Runnable() {
+                    @Override public void run() {
+                        Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class));
+                        Set<FailingEntity> replacements = Sets.difference(all, initialMembers);
+                        assertEquals(replacements.size(), 4);
+                    }});
+            }
+        }
+    }
+
+
+    private Predicate<Object> predicateOnlyTrueForCallAt(final int callNumber) {
+        return predicateOnlyTrueForCallRange(callNumber, callNumber);
+    }
+
+    private Predicate<Object> predicateOnlyTrueForCallAtOrAfter(final int callLowerNumber) {
+        return predicateOnlyTrueForCallRange(callLowerNumber, Integer.MAX_VALUE);
+    }
+    
+    private Predicate<Object> predicateOnlyTrueForCallRange(final int callLowerNumber, final int callUpperNumber) {
+        return new Predicate<Object>() {
+            private final AtomicInteger counter = new AtomicInteger(0);
+            @Override public boolean apply(Object input) {
+                int num = counter.incrementAndGet();
+                return num >= callLowerNumber && num <= callUpperNumber;
+            }
+        };
+    }
+
+    private void assertEventuallyHasEntityReplacementFailedEvent(final Entity expectedCluster) {
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceReplacer.ENTITY_REPLACEMENT_FAILED, "events="+events);
+                assertEquals(Iterables.getOnlyElement(events).getSource(), expectedCluster, "events="+events);
+                assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), expectedCluster, "events="+events);
+            }});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
new file mode 100644
index 0000000..c93612b
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.trait.FailingEntity;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class ServiceRestarterTest {
+
+    private static final int TIMEOUT_MS = 10*1000;
+
+    private ManagementContext managementContext;
+    private TestApplication app;
+    private TestEntity e1;
+    private ServiceRestarter policy;
+    private SensorEventListener<Object> eventListener;
+    private List<SensorEvent<?>> events;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        managementContext = new LocalManagementContextForTests();
+        app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+        e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+        events = Lists.newCopyOnWriteArrayList();
+        eventListener = new SensorEventListener<Object>() {
+            @Override public void onEvent(SensorEvent<Object> event) {
+                events.add(event);
+            }
+        };
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (managementContext != null) Entities.destroyAll(managementContext);
+    }
+    
+    @Test
+    public void testRestartsOnFailure() throws Exception {
+        policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        e1.addPolicy(policy);
+        
+        e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(e1.getCallHistory(), ImmutableList.of("restart"));
+            }});
+    }
+    
+    @Test(groups="Integration") // Has a 1 second wait
+    public void testDoesNotRestartsWhenHealthy() throws Exception {
+        policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        e1.addPolicy(policy);
+        
+        e1.emit(HASensors.ENTITY_RECOVERED, new FailureDescriptor(e1, "not a failure"));
+        
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertEquals(e1.getCallHistory(), ImmutableList.of());
+            }});
+    }
+    
+    @Test
+    public void testEmitsFailureEventWhenRestarterFails() throws Exception {
+        final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+                .configure(FailingEntity.FAIL_ON_RESTART, true));
+        app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener);
+
+        policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        e2.addPolicy(policy);
+
+        e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure"));
+        
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceRestarter.ENTITY_RESTART_FAILED, "events="+events);
+                assertEquals(Iterables.getOnlyElement(events).getSource(), e2, "events="+events);
+                assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), e2, "events="+events);
+            }});
+        
+        assertEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+    }
+    
+    @Test
+    public void testDoesNotSetOnFireOnFailure() throws Exception {
+        final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+                .configure(FailingEntity.FAIL_ON_RESTART, true));
+        app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener);
+
+        policy = new ServiceRestarter(new ConfigBag()
+                .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+                .configure(ServiceRestarter.SET_ON_FIRE_ON_FAILURE, false));
+        e2.addPolicy(policy);
+
+        e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure"));
+        
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertNotEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+            }});
+    }
+    
+    // Previously RestarterPolicy called entity.restart inside the event-listener thread.
+    // That caused all other events for that entity's subscriptions to be queued until that
+    // entity's single event handler thread was free again.
+    @Test
+    public void testRestartDoesNotBlockOtherSubscriptions() throws Exception {
+        final CountDownLatch inRestartLatch = new CountDownLatch(1);
+        final CountDownLatch continueRestartLatch = new CountDownLatch(1);
+        
+        final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+                .configure(FailingEntity.FAIL_ON_RESTART, true)
+                .configure(FailingEntity.EXEC_ON_FAILURE, new Function<Object, Void>() {
+                    @Override public Void apply(Object input) {
+                        inRestartLatch.countDown();
+                        try {
+                            continueRestartLatch.await();
+                        } catch (InterruptedException e) {
+                            throw Exceptions.propagate(e);
+                        }
+                        return null;
+                    }}));
+        
+        e2.addPolicy(PolicySpec.create(ServiceRestarter.class)
+                .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+        e2.subscribe(e2, TestEntity.SEQUENCE, eventListener);
+
+        // Cause failure, and wait for entity.restart to be blocking
+        e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+        assertTrue(inRestartLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        
+        // Expect other notifications to continue to get through
+        e2.setAttribute(TestEntity.SEQUENCE, 1);
+        Asserts.succeedsEventually(new Runnable() {
+            @Override public void run() {
+                assertEquals(Iterables.getOnlyElement(events).getValue(), 1);
+            }});
+
+        // Allow restart to finish
+        continueRestartLatch.countDown();
+    }
+}