You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/15 15:33:07 UTC
[05/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/management
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
new file mode 100644
index 0000000..7e34071
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.ha.TestEntityFailingRebind.RebindException;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.internal.BrooklynFeatureEnablement;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+
+@Test
+public class HighAvailabilityManagerSplitBrainTest {
+
+ private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class);
+
+ private List<HaMgmtNode> nodes = new MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>();
+ Map<String,String> sharedBackingStore = MutableMap.of();
+ Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+ private AtomicLong sharedTime; // used to set the ticker's return value
+ private ClassLoader classLoader = getClass().getClassLoader();
+
+ public class HaMgmtNode {
+ // TODO share with HotStandbyTest and WarmStandbyTest and a few others (minor differences but worth it ultimately)
+
+ private ManagementContextInternal mgmt;
+ private String ownNodeId;
+ private String nodeName;
+ private ListeningObjectStore objectStore;
+ private ManagementPlaneSyncRecordPersister persister;
+ private HighAvailabilityManagerImpl ha;
+ private Ticker ticker;
+ private AtomicLong currentTime; // used to set the ticker's return value
+
+ public void setUp() throws Exception {
+ if (sharedTime==null)
+ currentTime = new AtomicLong(System.currentTimeMillis());
+
+ ticker = new Ticker() {
+ // strictly not a ticker because returns millis UTC, but it works fine even so
+ @Override public long read() {
+ if (sharedTime!=null) return sharedTime.get();
+ return currentTime.get();
+ }
+ };
+
+ nodeName = "node "+nodes.size();
+ mgmt = newLocalManagementContext();
+ ownNodeId = mgmt.getManagementNodeId();
+ objectStore = new ListeningObjectStore(newPersistenceObjectStore());
+ objectStore.injectManagementContext(mgmt);
+ objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+ persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+ ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+ BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
+ mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+ ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+ .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+ .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+ .setLocalTicker(ticker)
+ .setRemoteTicker(ticker)
+ .setPersister(persister);
+ log.info("Created "+nodeName+" "+ownNodeId);
+ }
+
+ public void tearDown() throws Exception {
+ if (ha != null) ha.stop();
+ if (mgmt != null) Entities.destroyAll(mgmt);
+ if (objectStore != null) objectStore.deleteCompletely();
+ }
+
+ private long tickerCurrentMillis() {
+ return ticker.read();
+ }
+
+ private long tickerAdvance(Duration duration) {
+ if (sharedTime!=null)
+ throw new IllegalStateException("Using shared ticker; cannot advance private node clock");
+ currentTime.addAndGet(duration.toMilliseconds());
+ return tickerCurrentMillis();
+ }
+
+ @Override
+ public String toString() {
+ return nodeName+" "+ownNodeId;
+ }
+ }
+
+ private Boolean prevThrowOnRebind;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ prevThrowOnRebind = TestEntityFailingRebind.getThrowOnRebind();
+ TestEntityFailingRebind.setThrowOnRebind(true);
+ nodes.clear();
+ sharedBackingStore.clear();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ try {
+ for (HaMgmtNode n: nodes)
+ n.tearDown();
+ } finally {
+ if (prevThrowOnRebind != null) TestEntityFailingRebind.setThrowOnRebind(prevThrowOnRebind);
+ }
+ }
+
+ public HaMgmtNode newNode() throws Exception {
+ HaMgmtNode node = new HaMgmtNode();
+ node.setUp();
+ nodes.add(node);
+ return node;
+ }
+
+ private void sharedTickerAdvance(Duration duration) {
+ if (sharedTime==null) {
+ for (HaMgmtNode n: nodes)
+ n.tickerAdvance(duration);
+ } else {
+ sharedTime.addAndGet(duration.toMilliseconds());
+ }
+ }
+
+ private long sharedTickerCurrentMillis() {
+ return sharedTime.get();
+ }
+
+ protected void useSharedTime() {
+ if (!nodes.isEmpty())
+ throw new IllegalStateException("shared time must be set up before any nodes created");
+ sharedTime = new AtomicLong(System.currentTimeMillis());
+ }
+
+ protected ManagementContextInternal newLocalManagementContext() {
+ return new LocalManagementContextForTests();
+ }
+
+ protected PersistenceObjectStore newPersistenceObjectStore() {
+ return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates);
+ }
+
+ @Test
+ public void testDoubleRebindFails() throws Exception {
+ useSharedTime();
+ HaMgmtNode n1 = newNode();
+ HaMgmtNode n2 = newNode();
+
+ // first auto should become master
+ n1.ha.start(HighAvailabilityMode.AUTO);
+ n2.ha.start(HighAvailabilityMode.AUTO);
+ assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+
+ TestApplication app = ApplicationBuilder.newManagedApp(
+ EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt);
+ app.start(ImmutableList.<Location>of());
+
+ n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+ //don't publish state for a while (i.e. long store delays, failures)
+ sharedTickerAdvance(Duration.ONE_MINUTE);
+
+ try {
+ n2.ha.publishAndCheck(false);
+ fail("n2 rebind failure expected");
+ } catch (Exception e) {
+ assertNestedRebindException(e);
+ }
+
+ // re-check should re-assert successfully, no rebind expected as he was previously master
+ n1.ha.publishAndCheck(false);
+ ManagementPlaneSyncRecord memento;
+ memento = n1.ha.loadManagementPlaneSyncRecord(true);
+ assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+
+ // hot backup permitted by the TestEntityFailingRebind
+ n1.ha.changeMode(HighAvailabilityMode.HOT_BACKUP);
+ memento = n1.ha.loadManagementPlaneSyncRecord(true);
+ assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.HOT_BACKUP);
+ try {
+ n1.ha.changeMode(HighAvailabilityMode.MASTER);
+ fail("n1 rebind failure expected");
+ } catch (Exception e) {
+ assertNestedRebindException(e);
+ }
+
+ memento = n1.ha.loadManagementPlaneSyncRecord(true);
+ assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+ assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+ }
+
+ @Test
+ public void testStandbyRebind() throws Exception {
+ useSharedTime();
+ HaMgmtNode n1 = newNode();
+ HaMgmtNode n2 = newNode();
+
+ // first auto should become master
+ n1.ha.start(HighAvailabilityMode.AUTO);
+ n2.ha.start(HighAvailabilityMode.AUTO);
+
+ TestApplication app = ApplicationBuilder.newManagedApp(
+ EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt);
+ app.start(ImmutableList.<Location>of());
+
+ n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+ //don't publish state for a while (i.e. long store delays, failures)
+ sharedTickerAdvance(Duration.ONE_MINUTE);
+
+ try {
+ n2.ha.publishAndCheck(false);
+ fail("n2 rebind failure expected");
+ } catch (Exception e) {
+ assertNestedRebindException(e);
+ }
+
+ TestEntityFailingRebind.setThrowOnRebind(false);
+ n1.ha.publishAndCheck(false);
+
+ ManagementPlaneSyncRecord memento = n1.ha.loadManagementPlaneSyncRecord(true);
+ assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+ }
+
+ private void assertNestedRebindException(Throwable t) {
+ Throwable ptr = t;
+ while (ptr != null) {
+ if (ptr instanceof RebindException) {
+ return;
+ }
+ ptr = ptr.getCause();
+ }
+ Exceptions.propagate(t);
+ }
+
+ @Test
+ public void testIfNodeStopsBeingAbleToWrite() throws Exception {
+ useSharedTime();
+ log.info("time at start "+sharedTickerCurrentMillis());
+
+ HaMgmtNode n1 = newNode();
+ HaMgmtNode n2 = newNode();
+
+ // first auto should become master
+ n1.ha.start(HighAvailabilityMode.AUTO);
+ ManagementPlaneSyncRecord memento1 = n1.ha.loadManagementPlaneSyncRecord(true);
+
+ log.info(n1+" HA: "+memento1);
+ assertEquals(memento1.getMasterNodeId(), n1.ownNodeId);
+ Long time0 = sharedTickerCurrentMillis();
+ assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+ assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+
+ // second - make explicit hot; that's a strictly more complex case than cold standby, so provides pretty good coverage
+ n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+ ManagementPlaneSyncRecord memento2 = n2.ha.loadManagementPlaneSyncRecord(true);
+
+ log.info(n2+" HA: "+memento2);
+ assertEquals(memento2.getMasterNodeId(), n1.ownNodeId);
+ assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY);
+ assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+ assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time0);
+
+ // and no entities at either
+ assertEquals(n1.mgmt.getApplications().size(), 0);
+ assertEquals(n2.mgmt.getApplications().size(), 0);
+
+ // create
+ TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), n1.mgmt);
+ app.start(ImmutableList.<Location>of());
+ app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello");
+
+ assertEquals(n1.mgmt.getApplications().size(), 1);
+ assertEquals(n2.mgmt.getApplications().size(), 0);
+ log.info("persisting "+n1.ownNodeId);
+ n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+ n1.objectStore.setWritesFailSilently(true);
+ log.info(n1+" writes off");
+ sharedTickerAdvance(Duration.ONE_MINUTE);
+ log.info("time now "+sharedTickerCurrentMillis());
+ Long time1 = sharedTickerCurrentMillis();
+
+ log.info("publish "+n2.ownNodeId);
+ n2.ha.publishAndCheck(false);
+ ManagementPlaneSyncRecord memento2b = n2.ha.loadManagementPlaneSyncRecord(true);
+ log.info(n2+" HA now: "+memento2b);
+
+ // n2 infers n1 as failed
+ assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+ assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento2b.getMasterNodeId(), n2.ownNodeId);
+ assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+ assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1);
+
+ assertEquals(n1.mgmt.getApplications().size(), 1);
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE), "hello");
+
+ n1.objectStore.setWritesFailSilently(false);
+ log.info(n1+" writes on");
+
+ sharedTickerAdvance(Duration.ONE_SECOND);
+ log.info("time now "+sharedTickerCurrentMillis());
+ Long time2 = sharedTickerCurrentMillis();
+
+ log.info("publish "+n1.ownNodeId);
+ n1.ha.publishAndCheck(false);
+ ManagementPlaneSyncRecord memento1b = n1.ha.loadManagementPlaneSyncRecord(true);
+ log.info(n1+" HA now: "+memento1b);
+
+ ManagementNodeState expectedStateAfterDemotion = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ?
+ ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY;
+
+ // n1 comes back and demotes himself
+ assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion);
+ assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento1b.getMasterNodeId(), n2.ownNodeId);
+ assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2);
+ assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1);
+
+ // n2 now sees itself as master, with n1 in standby again
+ ManagementPlaneSyncRecord memento2c = n2.ha.loadManagementPlaneSyncRecord(true);
+ log.info(n2+" HA now: "+memento2c);
+ assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion);
+ assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId);
+ assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2);
+ assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time2);
+
+ // right number of entities at n2; n1 may or may not depending whether hot standby is default
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ assertEquals(n1.mgmt.getApplications().size(), BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? 1 : 0);
+ }
+
+ @Test(invocationCount=50, groups="Integration")
+ public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception {
+ testIfNodeStopsBeingAbleToWrite();
+ }
+
+ @Test
+ public void testSimultaneousStartup() throws Exception {
+ doTestConcurrentStartup(5, null);
+ }
+
+ @Test
+ public void testNearSimultaneousStartup() throws Exception {
+ doTestConcurrentStartup(20, Duration.millis(20));
+ }
+
+ @Test(invocationCount=50, groups="Integration")
+ public void testNearSimultaneousStartupManyTimes() throws Exception {
+ doTestConcurrentStartup(20, Duration.millis(20));
+ }
+
+ protected void doTestConcurrentStartup(int size, final Duration staggerStart) throws Exception {
+ useSharedTime();
+
+ List<Thread> spawned = MutableList.of();
+ for (int i=0; i<size; i++) {
+ final HaMgmtNode n = newNode();
+ Thread t = new Thread() { public void run() {
+ if (staggerStart!=null) Time.sleep(staggerStart.multiply(Math.random()));
+ n.ha.start(HighAvailabilityMode.AUTO);
+ n.ha.setPollPeriod(Duration.millis(20));
+ } };
+ spawned.add(t);
+ t.start();
+ }
+
+ try {
+ final Stopwatch timer = Stopwatch.createStarted();
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ ManagementPlaneSyncRecord memento = nodes.get(0).ha.loadManagementPlaneSyncRecord(true);
+ List<ManagementNodeState> counts = MutableList.of(), savedCounts = MutableList.of();
+ for (HaMgmtNode n: nodes) {
+ counts.add(n.ha.getNodeState());
+ ManagementNodeSyncRecord m = memento.getManagementNodes().get(n.ownNodeId);
+ if (m!=null) {
+ savedCounts.add(m.getStatus());
+ }
+ }
+ log.info("while starting "+nodes.size()+" nodes: "
+ +Collections.frequency(counts, ManagementNodeState.MASTER)+" M + "
+ +Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+" hot + "
+ +Collections.frequency(counts, ManagementNodeState.STANDBY)+" warm + "
+ +Collections.frequency(counts, ManagementNodeState.INITIALIZING)+" init; "
+ + memento.getManagementNodes().size()+" saved, "
+ +Collections.frequency(savedCounts, ManagementNodeState.MASTER)+" M + "
+ +Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+" hot + "
+ +Collections.frequency(savedCounts, ManagementNodeState.STANDBY)+" warm + "
+ +Collections.frequency(savedCounts, ManagementNodeState.INITIALIZING)+" init");
+
+ if (timer.isRunning() && Duration.of(timer).compareTo(Duration.TEN_SECONDS)>0) {
+ log.warn("we seem to have a problem stabilizing"); //handy place to set a suspend-VM breakpoint!
+ timer.stop();
+ }
+ assertEquals(Collections.frequency(counts, ManagementNodeState.MASTER), 1);
+ assertEquals(Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(counts, ManagementNodeState.STANDBY), nodes.size()-1);
+ assertEquals(Collections.frequency(savedCounts, ManagementNodeState.MASTER), 1);
+ assertEquals(Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(savedCounts, ManagementNodeState.STANDBY), nodes.size()-1);
+ }});
+ } catch (Throwable t) {
+ log.warn("Failed to stabilize (rethrowing): "+t, t);
+ throw Exceptions.propagate(t);
+ }
+
+ for (Thread t: spawned)
+ t.join(Duration.THIRTY_SECONDS.toMilliseconds());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
new file mode 100644
index 0000000..5a7f79a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordDeltaImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl.PromotionListener;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder;
+import brooklyn.test.Asserts;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Test
+public abstract class HighAvailabilityManagerTestFixture {
+
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class);
+
+ private ManagementPlaneSyncRecordPersister persister;
+ protected ManagementContextInternal managementContext;
+ private String ownNodeId;
+ private HighAvailabilityManagerImpl manager;
+ private Ticker ticker;
+ private AtomicLong currentTime; // used to set the ticker's return value
+ private RecordingPromotionListener promotionListener;
+ private ClassLoader classLoader = getClass().getClassLoader();
+ private PersistenceObjectStore objectStore;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ currentTime = new AtomicLong(1000000000L);
+ ticker = new Ticker() {
+ // strictly not a ticker because returns millis UTC, but it works fine even so
+ @Override public long read() {
+ return currentTime.get();
+ }
+ };
+ promotionListener = new RecordingPromotionListener();
+ managementContext = newLocalManagementContext();
+ ownNodeId = managementContext.getManagementNodeId();
+ objectStore = newPersistenceObjectStore();
+ objectStore.injectManagementContext(managementContext);
+ objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+ persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader);
+ ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+ BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ managementContext.getBrooklynProperties(),
+ classLoader);
+ managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+ manager = ((HighAvailabilityManagerImpl)managementContext.getHighAvailabilityManager())
+ .setPollPeriod(getPollPeriod())
+ .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+ .setPromotionListener(promotionListener)
+ .setLocalTicker(ticker)
+ .setRemoteTicker(getRemoteTicker())
+ .setPersister(persister);
+ persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+ .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+ .build());
+
+ }
+
+ protected ManagementContextInternal newLocalManagementContext() {
+ return LocalManagementContextForTests.newInstance();
+ }
+
+ protected abstract PersistenceObjectStore newPersistenceObjectStore();
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ if (manager != null) manager.stop();
+ if (managementContext != null) Entities.destroyAll(managementContext);
+ if (objectStore != null) objectStore.deleteCompletely();
+ }
+
+ // The web-console could still be polling (e.g. if have just restarted brooklyn), before the persister is set.
+ // Must not throw NPE, but instead return something sensible (e.g. an empty state record).
+ @Test
+ public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception {
+ HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext)
+ .setPollPeriod(Duration.millis(10))
+ .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+ .setPromotionListener(promotionListener)
+ .setLocalTicker(ticker)
+ .setRemoteTicker(ticker);
+ try {
+ ManagementPlaneSyncRecord state = manager2.loadManagementPlaneSyncRecord(true);
+ assertNotNull(state);
+ } finally {
+ manager2.stop();
+ }
+
+ }
+ // Can get a log.error about our management node's heartbeat being out of date. Caused by
+ // poller first writing a heartbeat record, and then the clock being incremented. But the
+ // next poll fixes it.
+ public void testPromotes() throws Exception {
+ persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+ .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+ .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+ .setMaster("node1")
+ .build());
+
+ manager.start(HighAvailabilityMode.AUTO);
+
+ // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish
+ // its own heartbeat with the new time; but node1's record is now out-of-date.
+ tickerAdvance(Duration.seconds(31));
+
+ // Expect to be notified of our promotion, as the only other node
+ promotionListener.assertCalledEventually();
+ }
+
+ @Test(groups="Integration") // because one second wait in succeedsContinually
+ public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception {
+ persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+ .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+ .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+ .setMaster("node1")
+ .build());
+
+ manager.start(HighAvailabilityMode.AUTO);
+
+ tickerAdvance(Duration.seconds(25));
+
+ // Expect not to be notified, as 25s < 30s timeout
+ // (it's normally a fake clock so won't hit 30, even waiting 1s below - but in "IntegrationTest" subclasses it is real!)
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps);
+ }});
+ }
+
+ public void testGetManagementPlaneStatus() throws Exception {
+ // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy!
+
+ tickerAdvance(Duration.FIVE_SECONDS);
+ persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+ .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY))
+ .node(newManagerMemento("zzzzzzz_node1", ManagementNodeState.STANDBY))
+ .build());
+ persister.loadSyncRecord();
+ long zzzTime = tickerCurrentMillis();
+ tickerAdvance(Duration.FIVE_SECONDS);
+
+ manager.start(HighAvailabilityMode.AUTO);
+ ManagementPlaneSyncRecord memento = manager.loadManagementPlaneSyncRecord(true);
+
+ // Note can assert timestamp because not "real" time; it's using our own Ticker
+ assertEquals(memento.getMasterNodeId(), ownNodeId);
+ assertEquals(memento.getManagementNodes().keySet(), ImmutableSet.of(ownNodeId, "zzzzzzz_node1"));
+ assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), ownNodeId);
+ assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.MASTER);
+ assertEquals(memento.getManagementNodes().get(ownNodeId).getLocalTimestamp(), tickerCurrentMillis());
+ assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1");
+ assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY);
+ assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(), zzzTime);
+ }
+
+ @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures
+ public void testGetManagementPlaneStatusManyTimes() throws Exception {
+ testGetManagementPlaneStatus();
+ }
+
+ @Test
+ public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() throws Exception {
+ persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+ .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+ .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+ .setMaster("node1")
+ .build());
+
+ manager.start(HighAvailabilityMode.HOT_STANDBY);
+
+ ManagementPlaneSyncRecord state = manager.loadManagementPlaneSyncRecord(true);
+ assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER);
+ assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY);
+
+ // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish
+ // its own heartbeat with the new time; but node1's record is now out-of-date.
+ tickerAdvance(Duration.seconds(31));
+
+ ManagementPlaneSyncRecord state2 = manager.loadManagementPlaneSyncRecord(true);
+ assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED);
+ assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED);
+ }
+
+ protected Duration getPollPeriod() {
+ return Duration.millis(10);
+ }
+
+ protected long tickerCurrentMillis() {
+ return ticker.read();
+ }
+
+ protected long tickerAdvance(Duration duration) {
+ currentTime.addAndGet(duration.toMilliseconds());
+ return tickerCurrentMillis();
+ }
+
+ protected Ticker getRemoteTicker() {
+ return ticker;
+ }
+
+ protected ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status) {
+ Builder rb = BasicManagementNodeSyncRecord.builder();
+ rb.brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status);
+ rb.localTimestamp(tickerCurrentMillis());
+ if (getRemoteTicker()!=null)
+ rb.remoteTimestamp(getRemoteTicker().read());
+ return rb.build();
+ }
+
+ public static class RecordingPromotionListener implements PromotionListener {
+ public final List<Long> callTimestamps = Lists.newCopyOnWriteArrayList();
+
+ @Override
+ public void promotingToMaster() {
+ callTimestamps.add(System.currentTimeMillis());
+ }
+
+ public void assertNotCalled() {
+ assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps);
+ }
+
+ public void assertCalled() {
+ assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps);
+ }
+
+ public void assertCalledEventually() {
+ Asserts.succeedsEventually(new Runnable() {
+ @Override public void run() {
+ assertCalled();
+ }});
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
new file mode 100644
index 0000000..da4f998
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Application;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Feed;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.internal.AbstractManagementContext;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl;
+import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.entity.rebind.RebindTestFixture;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation.LocalhostMachine;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.Iterables;
+
+public class HotStandbyTest {
+
+ private static final Logger log = LoggerFactory.getLogger(HotStandbyTest.class);
+
+ private List<HaMgmtNode> nodes = new MutableList<HotStandbyTest.HaMgmtNode>();
+ Map<String,String> sharedBackingStore = MutableMap.of();
+ Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+ private ClassLoader classLoader = getClass().getClassLoader();
+
+ public class HaMgmtNode {
+ // TODO share with WarmStandbyTest and SplitBrainTest and a few others (minor differences but worth it ultimately)
+
+ private ManagementContextInternal mgmt;
+ private String ownNodeId;
+ private String nodeName;
+ private ListeningObjectStore objectStore;
+ private ManagementPlaneSyncRecordPersister persister;
+ private HighAvailabilityManagerImpl ha;
+ private Duration persistOrRebindPeriod = Duration.ONE_SECOND;
+
+ public void setUp() throws Exception {
+ nodeName = "node "+nodes.size();
+ mgmt = newLocalManagementContext();
+ ownNodeId = mgmt.getManagementNodeId();
+ objectStore = new ListeningObjectStore(newPersistenceObjectStore());
+ objectStore.injectManagementContext(mgmt);
+ objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+ persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+ ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+ BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
+ ((RebindManagerImpl)mgmt.getRebindManager()).setPeriodicPersistPeriod(persistOrRebindPeriod);
+ mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+ ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+ .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+ .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+ .setPersister(persister);
+ log.info("Created "+nodeName+" "+ownNodeId);
+ }
+
+ public void tearDownThisOnly() throws Exception {
+ if (ha != null) ha.stop();
+ if (mgmt!=null) mgmt.getRebindManager().stop();
+ if (mgmt != null) Entities.destroyAll(mgmt);
+ }
+
+ public void tearDownAll() throws Exception {
+ tearDownThisOnly();
+ // can't delete the object store until all being torn down
+ if (objectStore != null) objectStore.deleteCompletely();
+ }
+
+ @Override
+ public String toString() {
+ return nodeName+" "+ownNodeId;
+ }
+
+ public RebindManagerImpl rebinder() {
+ return (RebindManagerImpl)mgmt.getRebindManager();
+ }
+ }
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ nodes.clear();
+ sharedBackingStore.clear();
+ }
+
+ public HaMgmtNode newNode(Duration persistOrRebindPeriod) throws Exception {
+ HaMgmtNode node = new HaMgmtNode();
+ node.persistOrRebindPeriod = persistOrRebindPeriod;
+ node.setUp();
+ nodes.add(node);
+ return node;
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ for (HaMgmtNode n: nodes)
+ n.tearDownAll();
+ }
+
+ protected ManagementContextInternal newLocalManagementContext() {
+ return new LocalManagementContextForTests();
+ }
+
+ protected PersistenceObjectStore newPersistenceObjectStore() {
+ return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates);
+ }
+
+ private HaMgmtNode createMaster(Duration persistOrRebindPeriod) throws Exception {
+ HaMgmtNode n1 = newNode(persistOrRebindPeriod);
+ n1.ha.start(HighAvailabilityMode.AUTO);
+ assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+ return n1;
+ }
+
+ private HaMgmtNode createHotStandby(Duration rebindPeriod) throws Exception {
+ HaMgmtNode n2 = newNode(rebindPeriod);
+ n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+ assertEquals(n2.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+ return n2;
+ }
+
+ private TestApplication createFirstAppAndPersist(HaMgmtNode n1) throws Exception {
+ TestApplication app = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+ // for testing without enrichers, if desired:
+// TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class).impl(TestApplicationNoEnrichersImpl.class), n1.mgmt);
+ app.setDisplayName("First App");
+ app.start(MutableList.<Location>of());
+ app.config().set(TestEntity.CONF_NAME, "first-app");
+ app.setAttribute(TestEntity.SEQUENCE, 3);
+
+ forcePersistNow(n1);
+ return app;
+ }
+
+ protected void forcePersistNow(HaMgmtNode n1) {
+ n1.mgmt.getRebindManager().forcePersistNow(false, null);
+ }
+
+ private Application expectRebindSequenceNumber(HaMgmtNode master, HaMgmtNode hotStandby, Application app, int expectedSensorSequenceValue, boolean immediate) {
+ Application appRO = hotStandby.mgmt.lookup(app.getId(), Application.class);
+
+ if (immediate) {
+ forcePersistNow(master);
+ forceRebindNow(hotStandby);
+ EntityTestUtils.assertAttributeEquals(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue);
+ } else {
+ EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue);
+ }
+
+ log.info("got sequence number "+expectedSensorSequenceValue+" from "+appRO);
+
+ // make sure the instance (proxy) is unchanged
+ Application appRO2 = hotStandby.mgmt.lookup(app.getId(), Application.class);
+ Assert.assertTrue(appRO2==appRO);
+
+ return appRO;
+ }
+
+ private void forceRebindNow(HaMgmtNode hotStandby) {
+ hotStandby.mgmt.getRebindManager().rebind(null, null, ManagementNodeState.HOT_STANDBY);
+ }
+
+ @Test
+ public void testHotStandbySeesInitialCustomNameConfigAndSensorValueButDoesntAllowChange() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+ Assert.assertNotNull(appRO);
+ Assert.assertTrue(appRO instanceof TestApplication);
+ assertEquals(appRO.getDisplayName(), "First App");
+ assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app");
+ assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+
+ try {
+ ((TestApplication)appRO).setAttribute(TestEntity.SEQUENCE, 4);
+ Assert.fail("Should not have allowed sensor to be set");
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e);
+ }
+ assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+ }
+
+ @Test
+ public void testHotStandbySeesChangesToNameConfigAndSensorValue() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+ Assert.assertNotNull(appRO);
+ assertEquals(appRO.getChildren().size(), 0);
+
+ // test changes
+
+ app.setDisplayName("First App Renamed");
+ app.config().set(TestEntity.CONF_NAME, "first-app-renamed");
+ app.setAttribute(TestEntity.SEQUENCE, 4);
+
+ appRO = expectRebindSequenceNumber(n1, n2, app, 4, true);
+ assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+ assertEquals(appRO.getDisplayName(), "First App Renamed");
+ assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-renamed");
+
+ // and change again for good measure!
+
+ app.setDisplayName("First App");
+ app.config().set(TestEntity.CONF_NAME, "first-app-restored");
+ app.setAttribute(TestEntity.SEQUENCE, 5);
+
+ appRO = expectRebindSequenceNumber(n1, n2, app, 5, true);
+ assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+ assertEquals(appRO.getDisplayName(), "First App");
+ assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-restored");
+ }
+
+
+ public void testHotStandbySeesStructuralChangesIncludingRemoval() throws Exception {
+ doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+ }
+
+ @Test(groups="Integration") // due to time (it waits for background persistence)
+ public void testHotStandbyUnforcedSeesStructuralChangesIncludingRemoval() throws Exception {
+ doTestHotStandbySeesStructuralChangesIncludingRemoval(false);
+ }
+
+ public void doTestHotStandbySeesStructuralChangesIncludingRemoval(boolean immediate) throws Exception {
+ HaMgmtNode n1 = createMaster(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200));
+ TestApplication app = createFirstAppAndPersist(n1);
+ HaMgmtNode n2 = createHotStandby(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200));
+
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+ Assert.assertNotNull(appRO);
+ assertEquals(appRO.getChildren().size(), 0);
+
+ // test additions - new child, new app
+
+ TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+ Entities.manage(child);
+ TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+ app2.config().set(TestEntity.CONF_NAME, "second-app");
+
+ app.setAttribute(TestEntity.SEQUENCE, 4);
+ appRO = expectRebindSequenceNumber(n1, n2, app, 4, immediate);
+
+ assertEquals(appRO.getChildren().size(), 1);
+ Entity childRO = Iterables.getOnlyElement(appRO.getChildren());
+ assertEquals(childRO.getId(), child.getId());
+ assertEquals(childRO.getConfig(TestEntity.CONF_NAME), "first-child");
+
+ assertEquals(n2.mgmt.getApplications().size(), 2);
+ Application app2RO = n2.mgmt.lookup(app2.getId(), Application.class);
+ Assert.assertNotNull(app2RO);
+ assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+
+ assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 3);
+
+ // now test removals
+
+ Entities.unmanage(child);
+ Entities.unmanage(app2);
+
+ app.setAttribute(TestEntity.SEQUENCE, 5);
+ appRO = expectRebindSequenceNumber(n1, n2, app, 5, immediate);
+
+ EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, 5);
+ assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+ assertEquals(appRO.getChildren().size(), 0);
+ assertEquals(n2.mgmt.getApplications().size(), 1);
+ Assert.assertNull(n2.mgmt.lookup(app2.getId(), Application.class));
+ Assert.assertNull(n2.mgmt.lookup(child.getId(), Application.class));
+ }
+
+ @Test(groups="Integration", invocationCount=50)
+ public void testHotStandbySeesStructuralChangesIncludingRemovalManyTimes() throws Exception {
+ doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+ }
+
+ Deque<Long> usedMemory = new ArrayDeque<Long>();
+ protected long noteUsedMemory(String message) {
+ Time.sleep(Duration.millis(200));
+ for (HaMgmtNode n: nodes) {
+ ((AbstractManagementContext)n.mgmt).getGarbageCollector().gcIteration();
+ }
+ System.gc(); System.gc();
+ Time.sleep(Duration.millis(50));
+ System.gc(); System.gc();
+ long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ usedMemory.addLast(mem);
+ log.info("Memory used - "+message+": "+ByteSizeStrings.java().apply(mem));
+ return mem;
+ }
+ public void assertUsedMemoryLessThan(String event, long max) {
+ noteUsedMemory(event);
+ long nowUsed = usedMemory.peekLast();
+ if (nowUsed > max) {
+ // aggressively try to force GC
+ Time.sleep(Duration.ONE_SECOND);
+ usedMemory.removeLast();
+ noteUsedMemory(event+" (extra GC)");
+ nowUsed = usedMemory.peekLast();
+ if (nowUsed > max) {
+ Assert.fail("Too much memory used - "+ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max));
+ }
+ }
+ }
+ public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes) {
+ assertUsedMemoryLessThan(event, usedMemory.peekLast() + deltaMegabytes*1024*1024);
+ }
+
+ @Test(groups="Integration")
+ public void testHotStandbyDoesNotLeakLotsOfRebinds() throws Exception {
+ log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+ final int DELTA = 2;
+
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ long initialUsed = noteUsedMemory("Created app");
+
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+ assertUsedMemoryMaxDelta("Standby created", DELTA);
+
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ assertUsedMemoryMaxDelta("Persisted and rebinded once", DELTA);
+
+ for (int i=0; i<10; i++) {
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ }
+ assertUsedMemoryMaxDelta("Persisted and rebinded 10x", DELTA);
+
+ for (int i=0; i<1000; i++) {
+ if ((i+1)%100==0) {
+ noteUsedMemory("iteration "+(i+1));
+ usedMemory.removeLast();
+ }
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ }
+ assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+
+ Entities.unmanage(app);
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+
+ assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000);
+ }
+
+ static class BigObject {
+ public BigObject(int sizeBytes) { array = new byte[sizeBytes]; }
+ byte[] array;
+ }
+
+ @Test(groups="Integration")
+ public void testHotStandbyDoesNotLeakBigObjects() throws Exception {
+ log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+ final int SIZE = 5;
+ final int SIZE_UP_BOUND = SIZE+2;
+ final int SIZE_DOWN_BOUND = SIZE-1;
+ final int GRACE = 2;
+ // the XML persistence uses a lot of space, we approx at between 2x and 3c
+ final int SIZE_IN_XML = 3*SIZE;
+ final int SIZE_IN_XML_DOWN = 2*SIZE;
+
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ noteUsedMemory("Finished seeding");
+ Long initialUsed = usedMemory.peekLast();
+ app.config().set(TestEntity.CONF_OBJECT, new BigObject(SIZE*1000*1000));
+ assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND);
+ forcePersistNow(n1);
+ assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML);
+
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+ forceRebindNow(n2);
+ assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND);
+
+ for (int i=0; i<10; i++)
+ forceRebindNow(n2);
+ assertUsedMemoryMaxDelta("Several more rebinds", GRACE);
+ for (int i=0; i<10; i++) {
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ }
+ assertUsedMemoryMaxDelta("And more rebinds and more persists", GRACE);
+
+ app.config().set(TestEntity.CONF_OBJECT, "big is now small");
+ assertUsedMemoryMaxDelta("Big made small at primary", -SIZE_DOWN_BOUND);
+ forcePersistNow(n1);
+ assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN);
+
+ forceRebindNow(n2);
+ assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND);
+
+ Entities.unmanage(app);
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+
+ assertUsedMemoryLessThan("And now all unmanaged", initialUsed+GRACE*1000*1000);
+ }
+
+ @Test(groups="Integration") // because it's slow
+ // Sept 2014 - there is a small leak, of 200 bytes per child created and destroyed;
+ // but this goes away when the app is destroyed; it may be a benign record
+ public void testHotStandbyDoesNotLeakLotsOfRebindsCreatingAndDestroyingAChildEntity() throws Exception {
+ log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+ final int DELTA = 2;
+
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ long initialUsed = noteUsedMemory("Created app");
+
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+ assertUsedMemoryMaxDelta("Standby created", DELTA);
+
+ TestEntity lastChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+ Entities.manage(lastChild);
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ assertUsedMemoryMaxDelta("Child created and rebinded once", DELTA);
+
+ for (int i=0; i<1000; i++) {
+ if (i==9 || (i+1)%100==0) {
+ noteUsedMemory("iteration "+(i+1));
+ usedMemory.removeLast();
+ }
+ TestEntity newChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+ Entities.manage(newChild);
+ Entities.unmanage(lastChild);
+ lastChild = newChild;
+
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+ }
+ assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+
+ Entities.unmanage(app);
+ forcePersistNow(n1);
+ forceRebindNow(n2);
+
+ assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000);
+ }
+
+ protected void assertHotStandby(HaMgmtNode n1) {
+ assertEquals(n1.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+ Assert.assertTrue(n1.rebinder().isReadOnlyRunning());
+ Assert.assertFalse(n1.rebinder().isPersistenceRunning());
+ }
+
+ protected void assertMaster(HaMgmtNode n1) {
+ assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+ Assert.assertFalse(n1.rebinder().isReadOnlyRunning());
+ Assert.assertTrue(n1.rebinder().isPersistenceRunning());
+ }
+
+ @Test
+ public void testChangeMode() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+ TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+ Entities.manage(child);
+ TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+ app2.config().set(TestEntity.CONF_NAME, "second-app");
+
+ forcePersistNow(n1);
+ n2.ha.setPriority(1);
+ n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+
+ // both now hot standby
+ assertHotStandby(n1);
+ assertHotStandby(n2);
+
+ assertEquals(n1.mgmt.getApplications().size(), 2);
+ Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class);
+ Assert.assertNotNull(app2RO);
+ assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+ try {
+ ((TestApplication)app2RO).setAttribute(TestEntity.SEQUENCE, 4);
+ Assert.fail("Should not have allowed sensor to be set");
+ } catch (Exception e) {
+ Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e);
+ }
+
+ n1.ha.changeMode(HighAvailabilityMode.AUTO);
+ n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false);
+ // both still hot standby (n1 will defer to n2 as it has higher priority)
+ assertHotStandby(n1);
+ assertHotStandby(n2);
+
+ // with priority 1, n2 will now be elected
+ n2.ha.changeMode(HighAvailabilityMode.AUTO);
+ assertHotStandby(n1);
+ assertMaster(n2);
+
+ assertEquals(n2.mgmt.getApplications().size(), 2);
+ Application app2B = n2.mgmt.lookup(app2.getId(), Application.class);
+ Assert.assertNotNull(app2B);
+ assertEquals(app2B.getConfig(TestEntity.CONF_NAME), "second-app");
+ ((TestApplication)app2B).setAttribute(TestEntity.SEQUENCE, 4);
+
+ forcePersistNow(n2);
+ forceRebindNow(n1);
+ Application app2BRO = n1.mgmt.lookup(app2.getId(), Application.class);
+ Assert.assertNotNull(app2BRO);
+ EntityTestUtils.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4);
+ }
+
+ @Test(groups="Integration", invocationCount=20)
+ public void testChangeModeManyTimes() throws Exception {
+ testChangeMode();
+ }
+
+ @Test
+ public void testChangeModeToDisabledAndBack() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ n1.mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachine.class));
+ @SuppressWarnings("unused")
+ TestApplication app = createFirstAppAndPersist(n1);
+
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+ // disabled n1 allows n2 to become master when next we tell it to check
+ n1.ha.changeMode(HighAvailabilityMode.DISABLED);
+ n2.ha.changeMode(HighAvailabilityMode.AUTO);
+ assertMaster(n2);
+ assertEquals(n1.ha.getNodeState(), ManagementNodeState.FAILED);
+ Assert.assertTrue(n1.mgmt.getApplications().isEmpty(), "n1 should have had no apps; instead had: "+n1.mgmt.getApplications());
+ Assert.assertTrue(n1.mgmt.getEntityManager().getEntities().isEmpty(), "n1 should have had no entities; instead had: "+n1.mgmt.getEntityManager().getEntities());
+ Assert.assertTrue(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had no locations; instead had: "+n1.mgmt.getLocationManager().getLocations());
+
+ // we can now change n1 back to hot_standby
+ n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+ assertHotStandby(n1);
+ // and it sees apps
+ Assert.assertFalse(n1.mgmt.getApplications().isEmpty(), "n1 should have had apps now");
+ Assert.assertFalse(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had locations now");
+ // and if n2 is disabled, n1 promotes
+ n2.ha.changeMode(HighAvailabilityMode.DISABLED);
+ n1.ha.changeMode(HighAvailabilityMode.AUTO);
+ assertMaster(n1);
+ assertEquals(n2.ha.getNodeState(), ManagementNodeState.FAILED);
+ }
+
+ @Test
+ public void testHotStandbyDoesNotStartFeeds() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class));
+ forcePersistNow(n1);
+ Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
+ for (Feed feed : entity.feeds().getFeeds()) {
+ assertTrue(feed.isRunning(), "Feed expected running, but it is non-running");
+ }
+
+ HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+ TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), Entity.class);
+ Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
+ for (Feed feedRO : entityRO.feeds().getFeeds()) {
+ assertFalse(feedRO.isRunning(), "Feed expected non-active, but it is running");
+ }
+ }
+
+ @Test(groups="Integration")
+ public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws Exception {
+ testHotStandbyDoesNotStartFeeds();
+ final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+ Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+ }
+ }).runRequiringTrue();
+ // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+ RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+ }
+
+ @Test(groups="Integration")
+ public void testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() throws Exception {
+ HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+ TestApplication app = createFirstAppAndPersist(n1);
+ TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class));
+ forcePersistNow(n1);
+ Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: "+entity.feeds().getFeeds());
+
+ final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+ Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+ }
+ }).runRequiringTrue();
+ // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+ RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..2f08057
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+public class ImmutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord {
+ private final String masterNodeId;
+ private final Map<String, ManagementNodeSyncRecord> managementNodes;
+
+ ImmutableManagementPlaneSyncRecord(String masterNodeId, Map<String, ManagementNodeSyncRecord> nodes) {
+ this.masterNodeId = masterNodeId;
+ this.managementNodes = ImmutableMap.copyOf(nodes);
+ }
+
+ @Override
+ public String getMasterNodeId() {
+ return masterNodeId;
+ }
+
+ @Override
+ public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+ return managementNodes;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes.keySet()).toString();
+ }
+
+ @Override
+ public String toVerboseString() {
+ return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
new file mode 100644
index 0000000..5791c88
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** @deprecated since 0.7.0 use {@link ManagementPlaneSyncRecordPersisterToObjectStore}
+ * with {@link InMemoryObjectStore}
+ * <code>
+ * new ManagementPlaneSyncRecordPersisterToObjectStore(new InMemoryObjectStore(), classLoader)
+ * </code> */
+@Deprecated
+public class ManagementPlaneSyncRecordPersisterInMemory implements ManagementPlaneSyncRecordPersister {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterInMemory.class);
+
+ private final MutableManagementPlaneSyncRecord memento = new MutableManagementPlaneSyncRecord();
+
+ private volatile boolean running = true;
+
+ @Override
+ public synchronized void stop() {
+ running = false;
+ }
+
+ @Override
+ public ManagementPlaneSyncRecord loadSyncRecord() throws IOException {
+ if (!running) {
+ throw new IllegalStateException("Persister not running; cannot load memento");
+ }
+
+ return memento.snapshot();
+ }
+
+ @VisibleForTesting
+ @Override
+ public synchronized void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+ // The synchronized is sufficient - guarantee that no concurrent calls
+ return;
+ }
+
+ @Override
+ public synchronized void delta(Delta delta) {
+ if (!running) {
+ if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento");
+ return;
+ }
+
+ for (ManagementNodeSyncRecord m : delta.getNodes()) {
+ memento.addNode(m);
+ }
+ for (String id : delta.getRemovedNodeIds()) {
+ memento.deleteNode(id);
+ }
+ switch (delta.getMasterChange()) {
+ case NO_CHANGE:
+ break; // no-op
+ case SET_MASTER:
+ memento.setMasterNodeId(checkNotNull(delta.getNewMasterOrNull()));
+ break;
+ case CLEAR_MASTER:
+ memento.setMasterNodeId(null);
+ break; // no-op
+ default:
+ throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
new file mode 100644
index 0000000..8958c4b
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.core.management.ha.BasicMasterChooser.AlphabeticMasterChooser;
+import org.apache.brooklyn.core.management.ha.BasicMasterChooser.ScoredRecord;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.EntityFunctions;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class MasterChooserTest {
+
+ private MutableManagementPlaneSyncRecord memento;
+ private AlphabeticMasterChooser chooser;
+ private long now;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ memento = new MutableManagementPlaneSyncRecord();
+ chooser = new AlphabeticMasterChooser();
+ now = System.currentTimeMillis();
+ }
+
+ @Test
+ public void testChoosesFirstAlphanumeric() throws Exception {
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now));
+ memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now));
+ memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now));
+ Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+ String ownNodeId = "node2";
+ assertEquals(chooser.choose(memento, heartbeatTimeout, ownNodeId).getNodeId(), "node1");
+ }
+
+ @Test
+ public void testReturnsNullIfNoValid() throws Exception {
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000));
+ Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+ assertNull(chooser.choose(memento, heartbeatTimeout, "node2"));
+ }
+
+ @Test
+ public void testFiltersOutByHeartbeat() throws Exception {
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000));
+ memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now - 20*1000));
+ memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now));
+ Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+ assertEquals(getIds(chooser.sort(chooser.filterHealthy(memento, heartbeatTimeout, now))), ImmutableList.of("node2", "node3"));
+ }
+
+ protected static List<String> getIds(List<ScoredRecord<?>> filterHealthy) {
+ return ImmutableList.copyOf(Iterables.transform(filterHealthy, EntityFunctions.id()));
+ }
+
+ @Test
+ public void testFiltersOutByStatusNotPreferringMaster() throws Exception {
+ assertEquals(doTestFiltersOutByStatus(false, false), ImmutableList.of("node4", "node5"));
+ }
+ @Test
+ public void testFiltersOutByStatusPreferringMaster() throws Exception {
+ assertEquals(doTestFiltersOutByStatus(true, false), ImmutableList.of("node5", "node4"));
+ }
+
+ @Test
+ public void testFiltersOutByStatusNotPreferringHot() throws Exception {
+ assertEquals(doTestFiltersOutByStatus(false, true), ImmutableList.of("node4", "node5", "node6"));
+ }
+ @Test
+ public void testFiltersOutByStatusPreferringHot() throws Exception {
+ assertEquals(doTestFiltersOutByStatus(true, true), ImmutableList.of("node5", "node6", "node4"));
+ }
+
+ protected List<String> doTestFiltersOutByStatus(boolean preferHot, boolean includeHot) throws Exception {
+ chooser = new AlphabeticMasterChooser(preferHot);
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.FAILED, now));
+ memento.addNode(newManagerMemento("node2", ManagementNodeState.TERMINATED, now));
+ memento.addNode(newManagerMemento("node3", null, now));
+ memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now));
+ memento.addNode(newManagerMemento("node5", ManagementNodeState.MASTER, now));
+ if (includeHot)
+ memento.addNode(newManagerMemento("node6", ManagementNodeState.HOT_STANDBY, now));
+ return getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+ }
+
+ @Test
+ public void testExplicityPriority() throws Exception {
+ chooser = new AlphabeticMasterChooser();
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), 2L));
+ memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), -1L));
+ memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), null));
+ List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+ assertEquals(order, ImmutableList.of("node1", "node3", "node2"));
+ }
+
+ @Test
+ public void testVersionsMaybeNull() throws Exception {
+ chooser = new AlphabeticMasterChooser();
+ memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, "v10", null));
+ memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+ memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, "v3-snapshot", -1L));
+ memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+ memento.addNode(newManagerMemento("node5", ManagementNodeState.STANDBY, now, "v3-stable", null));
+ memento.addNode(newManagerMemento("node6", ManagementNodeState.STANDBY, now, "v1", null));
+ memento.addNode(newManagerMemento("node7", ManagementNodeState.STANDBY, now, null, null));
+ List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+ assertEquals(order, ImmutableList.of("node1", "node5", "node6", "node2", "node4", "node7", "node3"));
+ }
+
+ private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp) {
+ return newManagerMemento(nodeId, status, timestamp, BrooklynVersion.get(), null);
+ }
+ private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp,
+ String version, Long priority) {
+ return BasicManagementNodeSyncRecord.builder().brooklynVersion(version).nodeId(nodeId).status(status).localTimestamp(timestamp).remoteTimestamp(timestamp).
+ priority(priority).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..d9c8943
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.collect.Maps;
+
+public class MutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord {
+ private String masterNodeId;
+ private Map<String, ManagementNodeSyncRecord> managementNodes = Maps.newConcurrentMap();
+
+ @Override
+ public String getMasterNodeId() {
+ return masterNodeId;
+ }
+
+ @Override
+ public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+ return managementNodes;
+ }
+
+ @Override
+ public String toVerboseString() {
+ return toString();
+ }
+
+ public ImmutableManagementPlaneSyncRecord snapshot() {
+ return new ImmutableManagementPlaneSyncRecord(masterNodeId, managementNodes);
+ }
+
+ public void setMasterNodeId(String masterNodeId) {
+ this.masterNodeId = masterNodeId;
+ }
+
+ public void addNode(ManagementNodeSyncRecord memento) {
+ managementNodes.put(memento.getNodeId(), memento);
+ }
+
+ public void deleteNode(String nodeId) {
+ managementNodes.remove(nodeId);
+ }
+}
\ No newline at end of file