You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:05 UTC
[07/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
new file mode 100644
index 0000000..e2d6998
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetectorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+
+public class ConnectionFailureDetectorTest {
+
+ private static final int TIMEOUT_MS = 30*1000;
+ private static final int OVERHEAD = 250;
+ private static final int POLL_PERIOD = 100;
+
+ private ManagementContext managementContext;
+ private TestApplication app;
+
+ private List<SensorEvent<FailureDescriptor>> events;
+
+ private ServerSocket serverSocket;
+ private HostAndPort serverSocketAddress;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+
+ managementContext = new LocalManagementContextForTests();
+ app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+
+ app.getManagementContext().getSubscriptionManager().subscribe(
+ app,
+ HASensors.CONNECTION_FAILED,
+ new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ });
+ app.getManagementContext().getSubscriptionManager().subscribe(
+ app,
+ HASensors.CONNECTION_RECOVERED,
+ new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ });
+
+ serverSocketAddress = startServerSocket();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ stopServerSocket();
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ private HostAndPort startServerSocket() throws Exception {
+ if (serverSocketAddress != null) {
+ serverSocket = new ServerSocket(serverSocketAddress.getPort());
+ } else {
+ for (int i = 40000; i < 40100; i++) {
+ try {
+ serverSocket = new ServerSocket(i);
+ } catch (IOException e) {
+ // try next port
+ }
+ }
+ assertNotNull(serverSocket, "Failed to create server socket; no ports free in range!");
+ serverSocketAddress = HostAndPort.fromParts(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort());
+ }
+ return serverSocketAddress;
+ }
+
+ private void stopServerSocket() throws Exception {
+ if (serverSocket != null) serverSocket.close();
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testNotNotifiedOfFailuresForHealthy() throws Exception {
+ // Create members before and after the policy is registered, to test both scenarios
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+ assertNoEventsContinually();
+ }
+
+ @Test
+ public void testNotifiedOfFailure() throws Exception {
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+ stopServerSocket();
+
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ assertEquals(events.size(), 1, "events="+events);
+ }
+
+ @Test
+ public void testNotifiedOfRecovery() throws Exception {
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+ stopServerSocket();
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+
+ // make the connection recover
+ startServerSocket();
+ assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+ assertEquals(events.size(), 2, "events="+events);
+ }
+
+ @Test
+ public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception {
+ stopServerSocket();
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress));
+
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception {
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE));
+
+ stopServerSocket();
+ Thread.sleep(100);
+ startServerSocket();
+
+ assertNoEventsContinually();
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception {
+ final int stabilisationDelay = 1000;
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ stopServerSocket();
+
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testFailuresThenUpDownResetsStabilisationCount() throws Exception {
+ final long stabilisationDelay = 1000;
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ stopServerSocket();
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ startServerSocket();
+ Thread.sleep(POLL_PERIOD+OVERHEAD);
+ stopServerSocket();
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception {
+ final long stabilisationDelay = 1000;
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ stopServerSocket();
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ events.clear();
+
+ startServerSocket();
+ Thread.sleep(POLL_PERIOD+OVERHEAD);
+ stopServerSocket();
+
+ assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD));
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception {
+ final int stabilisationDelay = 1000;
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ stopServerSocket();
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ events.clear();
+
+ startServerSocket();
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+ assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testRecoversThenDownUpResetsStabilisationCount() throws Exception {
+ final long stabilisationDelay = 1000;
+
+ app.addPolicy(PolicySpec.create(ConnectionFailureDetector.class)
+ .configure(ConnectionFailureDetector.ENDPOINT, serverSocketAddress)
+ .configure(ConnectionFailureDetector.CONNECTION_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ stopServerSocket();
+ assertHasEventEventually(HASensors.CONNECTION_FAILED, Predicates.<Object>equalTo(app), null);
+ events.clear();
+
+ startServerSocket();
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ stopServerSocket();
+ Thread.sleep(POLL_PERIOD+OVERHEAD);
+ startServerSocket();
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ assertHasEventEventually(HASensors.CONNECTION_RECOVERED, Predicates.<Object>equalTo(app), null);
+ }
+
+ private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (event.getSensor().equals(sensor) &&
+ (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+ (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+ return;
+ }
+ }
+ fail("No matching "+sensor+" event found; events="+events);
+ }
+
+ private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override public void run() {
+ assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+ }});
+ }
+
+ private void assertNoEventsContinually(Duration duration) {
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() {
+ @Override public void run() {
+ assertTrue(events.isEmpty(), "events="+events);
+ }});
+ }
+
+ private void assertNoEventsContinually() {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(events.isEmpty(), "events="+events);
+ }});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
new file mode 100644
index 0000000..3782ccf
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/HaPolicyRebindTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.rebind.RebindTestFixtureWithApp;
+
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class HaPolicyRebindTest extends RebindTestFixtureWithApp {
+
+ private TestEntity origEntity;
+ private SensorEventListener<FailureDescriptor> eventListener;
+ private List<SensorEvent<FailureDescriptor>> events;
+
+ @Override
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+ origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class));
+ events = Lists.newCopyOnWriteArrayList();
+ eventListener = new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ };
+ }
+
+ @Test
+ public void testServiceRestarterWorksAfterRebind() throws Exception {
+ origEntity.addPolicy(PolicySpec.create(ServiceRestarter.class)
+ .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+
+ TestApplication newApp = rebind();
+ final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+
+ newEntity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(origEntity, "simulate failure"));
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(newEntity.getCallHistory(), ImmutableList.of("restart"));
+ }});
+ }
+
+ @Test
+ public void testServiceReplacerWorksAfterRebind() throws Exception {
+ Location origLoc = origManagementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class));
+ DynamicCluster origCluster = origApp.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+ .configure(DynamicCluster.INITIAL_SIZE, 3));
+ origApp.start(ImmutableList.<Location>of(origLoc));
+
+ origCluster.addPolicy(PolicySpec.create(ServiceReplacer.class)
+ .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+
+ // rebind
+ TestApplication newApp = rebind();
+ final DynamicCluster newCluster = (DynamicCluster) Iterables.find(newApp.getChildren(), Predicates.instanceOf(DynamicCluster.class));
+
+ // stimulate the policy
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(newCluster.getMembers());
+ final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1);
+
+ newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener);
+ newApp.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ // Expect e1 to be replaced
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(newCluster.getMembers()), initialMembers);
+ Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(newCluster.getMembers()));
+ assertEquals(removedMembers, ImmutableSet.of(e1));
+ assertEquals(newMembers.size(), 1);
+ assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+
+ // TODO e1 not reporting "start" after rebind because callHistory is a field rather than an attribute, so was not persisted
+ Asserts.assertEqualsIgnoringOrder(e1.getCallHistory(), ImmutableList.of("stop"));
+ assertFalse(Entities.isManaged(e1));
+ }});
+ }
+
+ @Test
+ public void testServiceFailureDetectorWorksAfterRebind() throws Exception {
+ origEntity.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ // rebind
+ TestApplication newApp = rebind();
+ final TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class));
+
+ newApp.getManagementContext().getSubscriptionManager().subscribe(newEntity, HASensors.ENTITY_FAILED, eventListener);
+
+ newEntity.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(newEntity, Lifecycle.RUNNING);
+
+ // trigger the failure
+ newEntity.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(newEntity), null);
+ assertEquals(events.size(), 1, "events="+events);
+ }
+
+ private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override public void run() {
+ assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+ }});
+ }
+
+ private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (event.getSensor().equals(sensor) &&
+ (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+ (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+ return;
+ }
+ }
+ fail("No matching "+sensor+" event found; events="+events);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
new file mode 100644
index 0000000..ca84592
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorStabilizationTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogicTest;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+
+/** also see more primitive tests in {@link ServiceStateLogicTest} */
+public class ServiceFailureDetectorStabilizationTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetectorStabilizationTest.class);
+
+ private static final int TIMEOUT_MS = 10*1000;
+ private static final int OVERHEAD = 250;
+
+ private ManagementContext managementContext;
+ private TestApplication app;
+ private TestEntity e1;
+
+ private List<SensorEvent<FailureDescriptor>> events;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+
+ managementContext = new LocalManagementContextForTests();
+ app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+ e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ app.getManagementContext().getSubscriptionManager().subscribe(
+ e1,
+ HASensors.ENTITY_FAILED,
+ new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ });
+ app.getManagementContext().getSubscriptionManager().subscribe(
+ e1,
+ HASensors.ENTITY_RECOVERED,
+ new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ });
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotNotifiedOfTemporaryFailuresDuringStabilisationDelay() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.ONE_MINUTE));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ Thread.sleep(100);
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+
+ assertNoEventsContinually();
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotifiedOfFailureAfterStabilisationDelay() throws Exception {
+ final int stabilisationDelay = 1000;
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testFailuresThenUpDownResetsStabilisationCount() throws Exception {
+ LOG.debug("Running testFailuresThenUpDownResetsStabilisationCount");
+ final long stabilisationDelay = 1000;
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ Thread.sleep(OVERHEAD);
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotNotifiedOfTemporaryRecoveryDuringStabilisationDelay() throws Exception {
+ final long stabilisationDelay = 1000;
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ events.clear();
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ Thread.sleep(100);
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertNoEventsContinually(Duration.of(stabilisationDelay + OVERHEAD));
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testNotifiedOfRecoveryAfterStabilisationDelay() throws Exception {
+ final int stabilisationDelay = 1000;
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ events.clear();
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test(groups="Integration") // Because slow
+ public void testRecoversThenDownUpResetsStabilisationCount() throws Exception {
+ final long stabilisationDelay = 1000;
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.of(stabilisationDelay)));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ events.clear();
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ Thread.sleep(OVERHEAD);
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ assertNoEventsContinually(Duration.of(stabilisationDelay - OVERHEAD));
+
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (event.getSensor().equals(sensor) &&
+ (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+ (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+ return;
+ }
+ }
+ fail("No matching "+sensor+" event found; events="+events);
+ }
+
+ private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override public void run() {
+ assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+ }});
+ }
+
+ private void assertNoEventsContinually(Duration duration) {
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", duration), new Runnable() {
+ @Override public void run() {
+ assertTrue(events.isEmpty(), "events="+events);
+ }});
+ }
+
+ private void assertNoEventsContinually() {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(events.isEmpty(), "events="+events);
+ }});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
new file mode 100644
index 0000000..2ad2d79
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceFailureDetectorTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.EnricherSpec;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+
+public class ServiceFailureDetectorTest {
+ private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorTest.class);
+
+ private static final int TIMEOUT_MS = 10*1000;
+
+ private ManagementContext managementContext;
+ private TestApplication app;
+ private TestEntity e1;
+
+ private List<SensorEvent<FailureDescriptor>> events;
+ private SensorEventListener<FailureDescriptor> eventListener;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ events = new CopyOnWriteArrayList<SensorEvent<FailureDescriptor>>();
+ eventListener = new SensorEventListener<FailureDescriptor>() {
+ @Override public void onEvent(SensorEvent<FailureDescriptor> event) {
+ events.add(event);
+ }
+ };
+
+ managementContext = new LocalManagementContextForTests();
+ app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+ e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ e1.addEnricher(ServiceStateLogic.newEnricherForServiceStateFromProblemsAndUp());
+
+ app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_FAILED, eventListener);
+ app.getManagementContext().getSubscriptionManager().subscribe(e1, HASensors.ENTITY_RECOVERED, eventListener);
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testNotNotifiedOfFailuresForHealthy() throws Exception {
+ // Create members before and after the policy is registered, to test both scenarios
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ assertNoEventsContinually();
+ assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+ }
+
+ @Test
+ public void testNotifiedOfFailure() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ assertEquals(events.size(), 0, "events="+events);
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 1, "events="+events);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ }
+
+ @Test
+ public void testNotifiedOfFailureOnProblem() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ assertEquals(events.size(), 0, "events="+events);
+
+ ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 1, "events="+events);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ }
+
+ @Test
+ public void testNotifiedOfFailureOnStateOnFire() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE);
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 1, "events="+events);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ }
+
+ @Test
+ public void testNotifiedOfRecovery() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ // Make the entity fail
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ // And make the entity recover
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 2, "events="+events);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ }
+
+ @Test
+ public void testNotifiedOfRecoveryFromProblems() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ // Make the entity fail
+ ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ // And make the entity recover
+ ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 2, "events="+events);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ }
+
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testEmitsEntityFailureOnlyIfPreviouslyUp() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ // Make the entity fail
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ assertNoEventsContinually();
+ }
+
+ @Test
+ public void testDisablingPreviouslyUpRequirementForEntityFailed() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test
+ public void testDisablingOnFire() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.PRACTICALLY_FOREVER));
+
+ // Make the entity fail
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testOnFireAfterDelay() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND));
+
+ // Make the entity fail
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+ Time.sleep(Duration.millis(100));
+ assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testOnFailureDelayFromProblemAndRecover() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_SECOND)
+ .configure(ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_SECOND));
+
+ // Set the entity to healthy
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ // Make the entity fail; won't set on-fire for 1s but will publish FAILED immediately.
+ ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+ EntityTestUtils.assertAttributeEqualsContinually(ImmutableMap.of("timeout", 100), e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(e1.getAttribute(TestEntity.SERVICE_STATE_ACTUAL), Lifecycle.RUNNING);
+
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ // Now recover: will publish RUNNING immediately, but has 1s stabilisation for RECOVERED
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ assertEquals(events.size(), 1, "events="+events);
+
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+ assertEquals(events.size(), 2, "events="+events);
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testAttendsToServiceState() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ // not counted as failed because not expected to be running
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertNoEventsContinually();
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testOnlyReportsFailureIfRunning() throws Exception {
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class));
+
+ // Make the entity fail
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.STARTING);
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ assertNoEventsContinually();
+ }
+
+ @Test
+ public void testReportsFailureWhenAlreadyDownOnRegisteringPolicy() throws Exception {
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ e1.setAttribute(TestEntity.SERVICE_UP, false);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test
+ public void testReportsFailureWhenAlreadyOnFireOnRegisteringPolicy() throws Exception {
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.ON_FIRE);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP, false));
+
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+ }
+
+ @Test(groups="Integration") // Has a 1.5 second wait
+ public void testRepublishedFailure() throws Exception {
+ Duration republishPeriod = Duration.millis(100);
+
+ e1.addEnricher(EnricherSpec.create(ServiceFailureDetector.class)
+ .configure(ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, republishPeriod));
+
+ // Set the entity to healthy
+ e1.setAttribute(TestEntity.SERVICE_UP, true);
+ ServiceStateLogic.setExpectedState(e1, Lifecycle.RUNNING);
+ EntityTestUtils.assertAttributeEqualsEventually(e1, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ // Make the entity fail;
+ ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(e1, "test", "foo");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+ assertHasEventEventually(HASensors.ENTITY_FAILED, Predicates.<Object>equalTo(e1), null);
+
+ //wait for at least 10 republish events (~1 sec)
+ assertEventsSizeEventually(10);
+
+ // Now recover
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(e1, "test");
+ EntityTestUtils.assertAttributeEqualsEventually(e1, TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+ assertHasEventEventually(HASensors.ENTITY_RECOVERED, Predicates.<Object>equalTo(e1), null);
+
+ //once recovered check no more failed events emitted periodically
+ assertEventsSizeContiniually(events.size());
+
+ SensorEvent<FailureDescriptor> prevEvent = null;
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (prevEvent != null) {
+ long repeatOffset = event.getTimestamp() - prevEvent.getTimestamp();
+ long deviation = Math.abs(repeatOffset - republishPeriod.toMilliseconds());
+ if (deviation > republishPeriod.toMilliseconds()/10 &&
+ //warn only if recovered is too far away from the last failure
+ (!event.getSensor().equals(HASensors.ENTITY_RECOVERED) ||
+ repeatOffset > republishPeriod.toMilliseconds())) {
+ log.error("The time between failure republish (" + repeatOffset + "ms) deviates too much from the expected " + republishPeriod + ". prevEvent=" + prevEvent + ", event=" + event);
+ }
+ }
+ prevEvent = event;
+ }
+
+ //make sure no republish takes place after recovered
+ assertEquals(prevEvent.getSensor(), HASensors.ENTITY_RECOVERED);
+ }
+
+ private void assertEventsSizeContiniually(final int size) {
+ Asserts.succeedsContinually(MutableMap.of("timeout", 500), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() == size, "assertEventsSizeContiniually expects " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
+ private void assertEventsSizeEventually(final int size) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override
+ public void run() {
+ assertTrue(events.size() >= size, "assertEventsSizeContiniually expects at least " + size + " events but found " + events.size() + ": " + events);
+ }
+ });
+ }
+
+ private void assertHasEvent(Sensor<?> sensor, Predicate<Object> componentPredicate, Predicate<? super CharSequence> descriptionPredicate) {
+ for (SensorEvent<FailureDescriptor> event : events) {
+ if (event.getSensor().equals(sensor) &&
+ (componentPredicate == null || componentPredicate.apply(event.getValue().getComponent())) &&
+ (descriptionPredicate == null || descriptionPredicate.apply(event.getValue().getDescription()))) {
+ return;
+ }
+ }
+ fail("No matching "+sensor+" event found; events="+events);
+ }
+
+ private void assertHasEventEventually(final Sensor<?> sensor, final Predicate<Object> componentPredicate, final Predicate<? super CharSequence> descriptionPredicate) {
+ Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ @Override public void run() {
+ assertHasEvent(sensor, componentPredicate, descriptionPredicate);
+ }});
+ }
+
+ private void assertNoEventsContinually() {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(events.isEmpty(), "events="+events);
+ }});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
new file mode 100644
index 0000000..f92603a
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceReplacerTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.QuorumCheck;
+import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.trait.FailingEntity;
+
+import org.apache.brooklyn.location.basic.SimulatedLocation;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.javalang.JavaClassNames;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ServiceReplacerTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ServiceReplacerTest.class);
+
+ private ManagementContext managementContext;
+ private TestApplication app;
+ private SimulatedLocation loc;
+ private SensorEventListener<Object> eventListener;
+ private List<SensorEvent<?>> events;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ managementContext = new LocalManagementContextForTests();
+ app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+ loc = managementContext.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class));
+ events = Lists.newCopyOnWriteArrayList();
+ eventListener = new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ events.add(event);
+ }
+ };
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ @Test
+ public void testReplacesFailedMember() throws Exception {
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(TestEntity.class))
+ .configure(DynamicCluster.INITIAL_SIZE, 3));
+ app.start(ImmutableList.<Location>of(loc));
+
+ ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ cluster.addPolicy(policy);
+
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+ final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 1);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ // Expect e1 to be replaced
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers);
+ Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers()));
+ assertEquals(removedMembers, ImmutableSet.of(e1));
+ assertEquals(newMembers.size(), 1);
+ assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+ assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop"));
+ assertFalse(Entities.isManaged(e1));
+ }});
+ }
+
+ @Test(invocationCount=100)
+ public void testSetsOnFireWhenFailToReplaceMemberManyTimes() throws Exception {
+ testSetsOnFireWhenFailToReplaceMember();
+ }
+
+ // fails the startup of the replacement entity (but not the original).
+ @Test
+ public void testSetsOnFireWhenFailToReplaceMember() throws Exception {
+ app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2)))
+ .configure(DynamicCluster.INITIAL_SIZE, 1)
+ .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true)
+ .configure(ComputeServiceIndicatorsFromChildrenAndMembers.UP_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue())
+ .configure(ComputeServiceIndicatorsFromChildrenAndMembers.RUNNING_QUORUM_CHECK, QuorumCheck.QuorumChecks.alwaysTrue()));
+ app.start(ImmutableList.<Location>of(loc));
+
+ // should not be on fire
+ Assert.assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+ // and should eventually be running
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+
+ log.info("started "+app+" for "+JavaClassNames.niceClassAndMethod());
+
+ ServiceReplacer policy = new ServiceReplacer(new ConfigBag().configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ cluster.addPolicy(policy);
+
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+ final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ // Expect cluster to go on-fire when fails to start replacement
+ // Note that we've set up-quorum and running-quorum to be "alwaysTrue" so that we don't get a transient onFire
+ // when the failed node fails to start (but before it has been removed from the group to be put in quarantine).
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ // Expect to have the second failed entity still kicking around as proof (in quarantine)
+ // The cluster should NOT go on fire until after the 2nd failure
+ Iterable<Entity> members = Iterables.filter(managementContext.getEntityManager().getEntities(), Predicates.instanceOf(FailingEntity.class));
+ assertEquals(Iterables.size(members), 2);
+
+ // e2 failed to start, so it won't have called stop on e1
+ TestEntity e2 = (TestEntity) Iterables.getOnlyElement(Sets.difference(ImmutableSet.copyOf(members), initialMembers));
+ assertEquals(e1.getCallHistory(), ImmutableList.of("start"), "e1.history="+e1.getCallHistory());
+ assertEquals(e2.getCallHistory(), ImmutableList.of("start"), "e2.history="+e2.getCallHistory());
+
+ // And will have received notification event about it
+ assertEventuallyHasEntityReplacementFailedEvent(cluster);
+ }
+
+ @Test(groups="Integration") // has a 1 second wait
+ public void testDoesNotOnFireWhenFailToReplaceMember() throws Exception {
+ app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(2)))
+ .configure(DynamicCluster.INITIAL_SIZE, 1)
+ .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true));
+ app.start(ImmutableList.<Location>of(loc));
+
+ ServiceReplacer policy = new ServiceReplacer(new ConfigBag()
+ .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+ .configure(ServiceReplacer.SET_ON_FIRE_ON_FAILURE, false));
+ cluster.addPolicy(policy);
+
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+ final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ // Configured to not mark cluster as on fire
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+ }});
+
+ // And will have received notification event about it
+ assertEventuallyHasEntityReplacementFailedEvent(cluster);
+ }
+
+ @Test(groups="Integration") // 1s wait
+ public void testStopFailureOfOldEntityDoesNotSetClusterOnFire() throws Exception {
+ app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_STOP_CONDITION, predicateOnlyTrueForCallAt(1)))
+ .configure(DynamicCluster.INITIAL_SIZE, 2));
+ app.start(ImmutableList.<Location>of(loc));
+
+ cluster.addPolicy(PolicySpec.create(ServiceReplacer.class)
+ .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+ final TestEntity e1 = (TestEntity) Iterables.get(initialMembers, 0);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ // Expect e1 to be replaced
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ Set<Entity> newMembers = Sets.difference(ImmutableSet.copyOf(cluster.getMembers()), initialMembers);
+ Set<Entity> removedMembers = Sets.difference(initialMembers, ImmutableSet.copyOf(cluster.getMembers()));
+ assertEquals(removedMembers, ImmutableSet.of(e1));
+ assertEquals(newMembers.size(), 1);
+ assertEquals(((TestEntity)Iterables.getOnlyElement(newMembers)).getCallHistory(), ImmutableList.of("start"));
+ assertEquals(e1.getCallHistory(), ImmutableList.of("start", "stop"));
+ assertFalse(Entities.isManaged(e1));
+ }});
+
+ // Failure to stop the failed member should not cause "on-fire" of cluster
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertNotEquals(cluster.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+ }});
+ }
+
+ /**
+ * If we keep on getting failure reports, never managing to replace the failed node, then don't keep trying
+ * (i.e. avoid infinite loop).
+ *
+ * TODO This code + configuration needs some work; it's not testing quite the scenarios that I
+ * was thinking of!
+ * I saw problem where a node failed, and the replacements failed, and we ended up trying thousands of times.
+ * (describing this scenario is made more complex by me having temporarily disabled the cluster from
+ * removing failed members, for debugging purposes!)
+ * Imagine these two scenarios:
+ * <ol>
+ * <li>Entity fails during call to start().
+ * Here, the cluster removes it as a member (either unmanages it or puts it in quarantine)
+ * So the ENTITY_FAILED is ignored because the entity is not a member at that point.
+ * <li>Entity returns from start(), but quickly goes to service-down.
+ * Here we'll keep trying to replace that entity. Depending how long that takes, we'll either
+ * enter a horrible infinite loop, or we'll just provision a huge number of VMs over a long
+ * time period.
+ * Unfortunately this scenario is not catered for in the code yet.
+ * </ol>
+ */
+ @Test(groups="Integration") // because takes 1.2 seconds
+ public void testAbandonsReplacementAfterNumFailures() throws Exception {
+ app.subscribe(null, ServiceReplacer.ENTITY_REPLACEMENT_FAILED, eventListener);
+
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_START_CONDITION, predicateOnlyTrueForCallAtOrAfter(11)))
+ .configure(DynamicCluster.INITIAL_SIZE, 10)
+ .configure(DynamicCluster.QUARANTINE_FAILED_ENTITIES, true));
+ app.start(ImmutableList.<Location>of(loc));
+
+ ServiceReplacer policy = new ServiceReplacer(new ConfigBag()
+ .configure(ServiceReplacer.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+ .configure(ServiceReplacer.FAIL_ON_NUM_RECURRING_FAILURES, 3));
+ cluster.addPolicy(policy);
+
+ final Set<Entity> initialMembers = ImmutableSet.copyOf(cluster.getMembers());
+ for (int i = 0; i < 5; i++) {
+ final int counter = i+1;
+ EntityInternal entity = (EntityInternal) Iterables.get(initialMembers, i);
+ entity.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(entity, "simulate failure"));
+ if (i <= 3) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class));
+ Set<FailingEntity> replacements = Sets.difference(all, initialMembers);
+ Set<?> replacementMembers = Sets.intersection(ImmutableSet.of(cluster.getMembers()), replacements);
+ assertTrue(replacementMembers.isEmpty());
+ assertEquals(replacements.size(), counter);
+ }});
+ } else {
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ Set<FailingEntity> all = ImmutableSet.copyOf(Iterables.filter(managementContext.getEntityManager().getEntities(), FailingEntity.class));
+ Set<FailingEntity> replacements = Sets.difference(all, initialMembers);
+ assertEquals(replacements.size(), 4);
+ }});
+ }
+ }
+ }
+
+
+ private Predicate<Object> predicateOnlyTrueForCallAt(final int callNumber) {
+ return predicateOnlyTrueForCallRange(callNumber, callNumber);
+ }
+
+ private Predicate<Object> predicateOnlyTrueForCallAtOrAfter(final int callLowerNumber) {
+ return predicateOnlyTrueForCallRange(callLowerNumber, Integer.MAX_VALUE);
+ }
+
+ private Predicate<Object> predicateOnlyTrueForCallRange(final int callLowerNumber, final int callUpperNumber) {
+ return new Predicate<Object>() {
+ private final AtomicInteger counter = new AtomicInteger(0);
+ @Override public boolean apply(Object input) {
+ int num = counter.incrementAndGet();
+ return num >= callLowerNumber && num <= callUpperNumber;
+ }
+ };
+ }
+
+ private void assertEventuallyHasEntityReplacementFailedEvent(final Entity expectedCluster) {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceReplacer.ENTITY_REPLACEMENT_FAILED, "events="+events);
+ assertEquals(Iterables.getOnlyElement(events).getSource(), expectedCluster, "events="+events);
+ assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), expectedCluster, "events="+events);
+ }});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
new file mode 100644
index 0000000..c93612b
--- /dev/null
+++ b/policy/src/test/java/org/apache/brooklyn/policy/ha/ServiceRestarterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.util.config.ConfigBag;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.trait.FailingEntity;
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+import brooklyn.test.Asserts;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class ServiceRestarterTest {
+
+ private static final int TIMEOUT_MS = 10*1000;
+
+ private ManagementContext managementContext;
+ private TestApplication app;
+ private TestEntity e1;
+ private ServiceRestarter policy;
+ private SensorEventListener<Object> eventListener;
+ private List<SensorEvent<?>> events;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ managementContext = new LocalManagementContextForTests();
+ app = ApplicationBuilder.newManagedApp(TestApplication.class, managementContext);
+ e1 = app.createAndManageChild(EntitySpec.create(TestEntity.class));
+ events = Lists.newCopyOnWriteArrayList();
+ eventListener = new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object> event) {
+ events.add(event);
+ }
+ };
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ }
+
+ @Test
+ public void testRestartsOnFailure() throws Exception {
+ policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ e1.addPolicy(policy);
+
+ e1.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(e1.getCallHistory(), ImmutableList.of("restart"));
+ }});
+ }
+
+ @Test(groups="Integration") // Has a 1 second wait
+ public void testDoesNotRestartsWhenHealthy() throws Exception {
+ policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ e1.addPolicy(policy);
+
+ e1.emit(HASensors.ENTITY_RECOVERED, new FailureDescriptor(e1, "not a failure"));
+
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertEquals(e1.getCallHistory(), ImmutableList.of());
+ }});
+ }
+
+ @Test
+ public void testEmitsFailureEventWhenRestarterFails() throws Exception {
+ final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_RESTART, true));
+ app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener);
+
+ policy = new ServiceRestarter(new ConfigBag().configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ e2.addPolicy(policy);
+
+ e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure"));
+
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(Iterables.getOnlyElement(events).getSensor(), ServiceRestarter.ENTITY_RESTART_FAILED, "events="+events);
+ assertEquals(Iterables.getOnlyElement(events).getSource(), e2, "events="+events);
+ assertEquals(((FailureDescriptor)Iterables.getOnlyElement(events).getValue()).getComponent(), e2, "events="+events);
+ }});
+
+ assertEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+ }
+
+ @Test
+ public void testDoesNotSetOnFireOnFailure() throws Exception {
+ final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_RESTART, true));
+ app.subscribe(e2, ServiceRestarter.ENTITY_RESTART_FAILED, eventListener);
+
+ policy = new ServiceRestarter(new ConfigBag()
+ .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED)
+ .configure(ServiceRestarter.SET_ON_FIRE_ON_FAILURE, false));
+ e2.addPolicy(policy);
+
+ e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e2, "simulate failure"));
+
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertNotEquals(e2.getAttribute(Attributes.SERVICE_STATE_ACTUAL), Lifecycle.ON_FIRE);
+ }});
+ }
+
+ // Previously RestarterPolicy called entity.restart inside the event-listener thread.
+ // That caused all other events for that entity's subscriptions to be queued until that
+ // entity's single event handler thread was free again.
+ @Test
+ public void testRestartDoesNotBlockOtherSubscriptions() throws Exception {
+ final CountDownLatch inRestartLatch = new CountDownLatch(1);
+ final CountDownLatch continueRestartLatch = new CountDownLatch(1);
+
+ final FailingEntity e2 = app.createAndManageChild(EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_RESTART, true)
+ .configure(FailingEntity.EXEC_ON_FAILURE, new Function<Object, Void>() {
+ @Override public Void apply(Object input) {
+ inRestartLatch.countDown();
+ try {
+ continueRestartLatch.await();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ return null;
+ }}));
+
+ e2.addPolicy(PolicySpec.create(ServiceRestarter.class)
+ .configure(ServiceRestarter.FAILURE_SENSOR_TO_MONITOR, HASensors.ENTITY_FAILED));
+ e2.subscribe(e2, TestEntity.SEQUENCE, eventListener);
+
+ // Cause failure, and wait for entity.restart to be blocking
+ e2.emit(HASensors.ENTITY_FAILED, new FailureDescriptor(e1, "simulate failure"));
+ assertTrue(inRestartLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+
+ // Expect other notifications to continue to get through
+ e2.setAttribute(TestEntity.SEQUENCE, 1);
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertEquals(Iterables.getOnlyElement(events).getValue(), 1);
+ }});
+
+ // Allow restart to finish
+ continueRestartLatch.countDown();
+ }
+}