You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/15 15:33:07 UTC

[05/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/management

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
new file mode 100644
index 0000000..7e34071
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.ha.TestEntityFailingRebind.RebindException;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.internal.BrooklynFeatureEnablement;
+import brooklyn.test.Asserts;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+
+@Test
+public class HighAvailabilityManagerSplitBrainTest {
+
+    private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerSplitBrainTest.class);
+    
+    private List<HaMgmtNode> nodes = new MutableList<HighAvailabilityManagerSplitBrainTest.HaMgmtNode>();
+    Map<String,String> sharedBackingStore = MutableMap.of();
+    Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+    private AtomicLong sharedTime; // used to set the ticker's return value
+    private ClassLoader classLoader = getClass().getClassLoader();
+    
+    public class HaMgmtNode {
+        // TODO share with HotStandbyTest and WarmStandbyTest and a few others (minor differences but worth it ultimately)
+
+        private ManagementContextInternal mgmt;
+        private String ownNodeId;
+        private String nodeName;
+        private ListeningObjectStore objectStore;
+        private ManagementPlaneSyncRecordPersister persister;
+        private HighAvailabilityManagerImpl ha;
+        private Ticker ticker;
+        private AtomicLong currentTime; // used to set the ticker's return value
+
+        public void setUp() throws Exception {
+            if (sharedTime==null)
+                currentTime = new AtomicLong(System.currentTimeMillis());
+            
+            ticker = new Ticker() {
+                // strictly not a ticker because returns millis UTC, but it works fine even so
+                @Override public long read() {
+                    if (sharedTime!=null) return sharedTime.get();
+                    return currentTime.get();
+                }
+            };
+            
+            nodeName = "node "+nodes.size();
+            mgmt = newLocalManagementContext();
+            ownNodeId = mgmt.getManagementNodeId();
+            objectStore = new ListeningObjectStore(newPersistenceObjectStore());
+            objectStore.injectManagementContext(mgmt);
+            objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+            persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+            ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
+            mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+            ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+                .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setLocalTicker(ticker)
+                .setRemoteTicker(ticker)
+                .setPersister(persister);
+            log.info("Created "+nodeName+" "+ownNodeId);
+        }
+        
+        public void tearDown() throws Exception {
+            if (ha != null) ha.stop();
+            if (mgmt != null) Entities.destroyAll(mgmt);
+            if (objectStore != null) objectStore.deleteCompletely();
+        }
+        
+        private long tickerCurrentMillis() {
+            return ticker.read();
+        }
+        
+        private long tickerAdvance(Duration duration) {
+            if (sharedTime!=null)
+                throw new IllegalStateException("Using shared ticker; cannot advance private node clock");
+            currentTime.addAndGet(duration.toMilliseconds());
+            return tickerCurrentMillis();
+        }
+        
+        @Override
+        public String toString() {
+            return nodeName+" "+ownNodeId;
+        }
+    }
+    
+    private Boolean prevThrowOnRebind;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        prevThrowOnRebind = TestEntityFailingRebind.getThrowOnRebind();
+        TestEntityFailingRebind.setThrowOnRebind(true);
+        nodes.clear();
+        sharedBackingStore.clear();
+    }
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        try {
+            for (HaMgmtNode n: nodes)
+                n.tearDown();
+        } finally {
+            if (prevThrowOnRebind != null) TestEntityFailingRebind.setThrowOnRebind(prevThrowOnRebind);
+        }
+    }
+
+    public HaMgmtNode newNode() throws Exception {
+        HaMgmtNode node = new HaMgmtNode();
+        node.setUp();
+        nodes.add(node);
+        return node;
+    }
+
+    private void sharedTickerAdvance(Duration duration) {
+        if (sharedTime==null) {
+            for (HaMgmtNode n: nodes)
+                n.tickerAdvance(duration);
+        } else {
+            sharedTime.addAndGet(duration.toMilliseconds());
+        }
+    }
+    
+    private long sharedTickerCurrentMillis() {
+        return sharedTime.get();
+    }
+    
+    protected void useSharedTime() {
+        if (!nodes.isEmpty())
+            throw new IllegalStateException("shared time must be set up before any nodes created");
+        sharedTime = new AtomicLong(System.currentTimeMillis());
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return new LocalManagementContextForTests();
+    }
+
+    protected PersistenceObjectStore newPersistenceObjectStore() {
+        return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates);
+    }
+    
+    @Test
+    public void testDoubleRebindFails() throws Exception {
+        useSharedTime();
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        n2.ha.start(HighAvailabilityMode.AUTO);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+
+        TestApplication app = ApplicationBuilder.newManagedApp(
+                EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+        //don't publish state for a while (i.e. long store delays, failures)
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+
+        try {
+            n2.ha.publishAndCheck(false);
+            fail("n2 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        // re-check should re-assert successfully, no rebind expected as he was previously master
+        n1.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento;
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+
+        // hot backup permitted by the TestEntityFailingRebind
+        n1.ha.changeMode(HighAvailabilityMode.HOT_BACKUP);
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.HOT_BACKUP);
+        try {
+            n1.ha.changeMode(HighAvailabilityMode.MASTER);
+            fail("n1 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+        assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+    }
+
+    @Test
+    public void testStandbyRebind() throws Exception {
+        useSharedTime();
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        n2.ha.start(HighAvailabilityMode.AUTO);
+
+        TestApplication app = ApplicationBuilder.newManagedApp(
+                EntitySpec.create(TestApplication.class).impl(TestEntityFailingRebind.class), n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+
+        //don't publish state for a while (i.e. long store delays, failures)
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+
+        try {
+            n2.ha.publishAndCheck(false);
+            fail("n2 rebind failure expected");
+        } catch (Exception e) {
+            assertNestedRebindException(e);
+        }
+
+        TestEntityFailingRebind.setThrowOnRebind(false);
+        n1.ha.publishAndCheck(false);
+
+        ManagementPlaneSyncRecord memento = n1.ha.loadManagementPlaneSyncRecord(true);
+        assertEquals(memento.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+    }
+    
+    private void assertNestedRebindException(Throwable t) {
+        Throwable ptr = t;
+        while (ptr != null) {
+            if (ptr instanceof RebindException) {
+                return;
+            }
+            ptr = ptr.getCause();
+        }
+        Exceptions.propagate(t);
+    }
+    
+    @Test
+    public void testIfNodeStopsBeingAbleToWrite() throws Exception {
+        useSharedTime();
+        log.info("time at start "+sharedTickerCurrentMillis());
+        
+        HaMgmtNode n1 = newNode();
+        HaMgmtNode n2 = newNode();
+        
+        // first auto should become master
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento1 = n1.ha.loadManagementPlaneSyncRecord(true);
+        
+        log.info(n1+" HA: "+memento1);
+        assertEquals(memento1.getMasterNodeId(), n1.ownNodeId);
+        Long time0 = sharedTickerCurrentMillis();
+        assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+        assertEquals(memento1.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+
+        // second - make explicit hot; that's a strictly more complex case than cold standby, so provides pretty good coverage
+        n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+        ManagementPlaneSyncRecord memento2 = n2.ha.loadManagementPlaneSyncRecord(true);
+        
+        log.info(n2+" HA: "+memento2);
+        assertEquals(memento2.getMasterNodeId(), n1.ownNodeId);
+        assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY);
+        assertEquals(memento2.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+        assertEquals(memento2.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time0);
+        
+        // and no entities at either
+        assertEquals(n1.mgmt.getApplications().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+
+        // create
+        TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class), n1.mgmt);
+        app.start(ImmutableList.<Location>of());
+        app.setAttribute(TestApplication.MY_ATTRIBUTE, "hello");
+        
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 0);
+        log.info("persisting "+n1.ownNodeId);
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+        
+        n1.objectStore.setWritesFailSilently(true);
+        log.info(n1+" writes off");
+        sharedTickerAdvance(Duration.ONE_MINUTE);
+        log.info("time now "+sharedTickerCurrentMillis());
+        Long time1 = sharedTickerCurrentMillis();
+        
+        log.info("publish "+n2.ownNodeId);
+        n2.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento2b = n2.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n2+" HA now: "+memento2b);
+        
+        // n2 infers n1 as failed 
+        assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getStatus(), ManagementNodeState.FAILED);
+        assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento2b.getMasterNodeId(), n2.ownNodeId);
+        assertEquals(memento2b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time0);
+        assertEquals(memento2b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1);
+        
+        assertEquals(n1.mgmt.getApplications().size(), 1);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        assertEquals(n1.mgmt.getApplications().iterator().next().getAttribute(TestApplication.MY_ATTRIBUTE), "hello");
+        
+        n1.objectStore.setWritesFailSilently(false);
+        log.info(n1+" writes on");
+        
+        sharedTickerAdvance(Duration.ONE_SECOND);
+        log.info("time now "+sharedTickerCurrentMillis());
+        Long time2 = sharedTickerCurrentMillis();
+        
+        log.info("publish "+n1.ownNodeId);
+        n1.ha.publishAndCheck(false);
+        ManagementPlaneSyncRecord memento1b = n1.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n1+" HA now: "+memento1b);
+        
+        ManagementNodeState expectedStateAfterDemotion = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ?
+            ManagementNodeState.HOT_STANDBY : ManagementNodeState.STANDBY;
+        
+        // n1 comes back and demotes himself 
+        assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion);
+        assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento1b.getMasterNodeId(), n2.ownNodeId);
+        assertEquals(memento1b.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2);
+        assertEquals(memento1b.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time1);
+        
+        // n2 now sees itself as master, with n1 in standby again
+        ManagementPlaneSyncRecord memento2c = n2.ha.loadManagementPlaneSyncRecord(true);
+        log.info(n2+" HA now: "+memento2c);
+        assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getStatus(), expectedStateAfterDemotion);
+        assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento2c.getMasterNodeId(), n2.ownNodeId);
+        assertEquals(memento2c.getManagementNodes().get(n1.ownNodeId).getRemoteTimestamp(), time2);
+        assertEquals(memento2c.getManagementNodes().get(n2.ownNodeId).getRemoteTimestamp(), time2);
+
+        // right number of entities at n2; n1 may or may not depending whether hot standby is default
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        assertEquals(n1.mgmt.getApplications().size(), BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_DEFAULT_STANDBY_IS_HOT_PROPERTY) ? 1 : 0);
+    }
+    
+    @Test(invocationCount=50, groups="Integration")
+    public void testIfNodeStopsBeingAbleToWriteManyTimes() throws Exception {
+        testIfNodeStopsBeingAbleToWrite();
+    }
+    
+    @Test
+    public void testSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(5, null);
+    }
+
+    @Test
+    public void testNearSimultaneousStartup() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    @Test(invocationCount=50, groups="Integration")
+    public void testNearSimultaneousStartupManyTimes() throws Exception {
+        doTestConcurrentStartup(20, Duration.millis(20));
+    }
+
+    protected void doTestConcurrentStartup(int size, final Duration staggerStart) throws Exception {
+        useSharedTime();
+        
+        List<Thread> spawned = MutableList.of();
+        for (int i=0; i<size; i++) {
+            final HaMgmtNode n = newNode();
+            Thread t = new Thread() { public void run() {
+                if (staggerStart!=null) Time.sleep(staggerStart.multiply(Math.random()));
+                n.ha.start(HighAvailabilityMode.AUTO);
+                n.ha.setPollPeriod(Duration.millis(20));
+            } };
+            spawned.add(t);
+            t.start();
+        }
+
+        try {
+            final Stopwatch timer = Stopwatch.createStarted();
+            Asserts.succeedsEventually(new Runnable() {
+                @Override public void run() {
+                    ManagementPlaneSyncRecord memento = nodes.get(0).ha.loadManagementPlaneSyncRecord(true);
+                    List<ManagementNodeState> counts = MutableList.of(), savedCounts = MutableList.of();
+                    for (HaMgmtNode n: nodes) {
+                        counts.add(n.ha.getNodeState());
+                        ManagementNodeSyncRecord m = memento.getManagementNodes().get(n.ownNodeId);
+                        if (m!=null) {
+                            savedCounts.add(m.getStatus());
+                        }
+                    }
+                    log.info("while starting "+nodes.size()+" nodes: "
+                        +Collections.frequency(counts, ManagementNodeState.MASTER)+" M + "
+                        +Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+" hot + "
+                        +Collections.frequency(counts, ManagementNodeState.STANDBY)+" warm + "
+                        +Collections.frequency(counts, ManagementNodeState.INITIALIZING)+" init; "
+                        + memento.getManagementNodes().size()+" saved, "
+                        +Collections.frequency(savedCounts, ManagementNodeState.MASTER)+" M + "
+                        +Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+" hot + "
+                        +Collections.frequency(savedCounts, ManagementNodeState.STANDBY)+" warm + "
+                        +Collections.frequency(savedCounts, ManagementNodeState.INITIALIZING)+" init");
+
+                    if (timer.isRunning() && Duration.of(timer).compareTo(Duration.TEN_SECONDS)>0) {
+                        log.warn("we seem to have a problem stabilizing");  //handy place to set a suspend-VM breakpoint!
+                        timer.stop();
+                    }
+                    assertEquals(Collections.frequency(counts, ManagementNodeState.MASTER), 1);
+                    assertEquals(Collections.frequency(counts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(counts, ManagementNodeState.STANDBY), nodes.size()-1);
+                    assertEquals(Collections.frequency(savedCounts, ManagementNodeState.MASTER), 1);
+                    assertEquals(Collections.frequency(savedCounts, ManagementNodeState.HOT_STANDBY)+Collections.frequency(savedCounts, ManagementNodeState.STANDBY), nodes.size()-1);
+                }});
+        } catch (Throwable t) {
+            log.warn("Failed to stabilize (rethrowing): "+t, t);
+            throw Exceptions.propagate(t);
+        }
+        
+        for (Thread t: spawned)
+            t.join(Duration.THIRTY_SECONDS.toMilliseconds());
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
new file mode 100644
index 0000000..5a7f79a
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HighAvailabilityManagerTestFixture.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordDeltaImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl.PromotionListener;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord.Builder;
+import brooklyn.test.Asserts;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Test
+public abstract class HighAvailabilityManagerTestFixture {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(HighAvailabilityManagerTestFixture.class);
+    
+    private ManagementPlaneSyncRecordPersister persister;
+    protected ManagementContextInternal managementContext;
+    private String ownNodeId;
+    private HighAvailabilityManagerImpl manager;
+    private Ticker ticker;
+    private AtomicLong currentTime; // used to set the ticker's return value
+    private RecordingPromotionListener promotionListener;
+    private ClassLoader classLoader = getClass().getClassLoader();
+    private PersistenceObjectStore objectStore;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        currentTime = new AtomicLong(1000000000L);
+        ticker = new Ticker() {
+            // strictly not a ticker because returns millis UTC, but it works fine even so
+            @Override public long read() {
+                return currentTime.get();
+            }
+        };
+        promotionListener = new RecordingPromotionListener();
+        managementContext = newLocalManagementContext();
+        ownNodeId = managementContext.getManagementNodeId();
+        objectStore = newPersistenceObjectStore();
+        objectStore.injectManagementContext(managementContext);
+        objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+        persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader);
+        ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+        BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(
+                objectStore, 
+                managementContext.getBrooklynProperties(), 
+                classLoader);
+        managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+        manager = ((HighAvailabilityManagerImpl)managementContext.getHighAvailabilityManager())
+                .setPollPeriod(getPollPeriod())
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setPromotionListener(promotionListener)
+                .setLocalTicker(ticker)
+                .setRemoteTicker(getRemoteTicker())
+                .setPersister(persister);
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+            .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+            .build());
+
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return LocalManagementContextForTests.newInstance();
+    }
+
+    protected abstract PersistenceObjectStore newPersistenceObjectStore();
+    
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        if (manager != null) manager.stop();
+        if (managementContext != null) Entities.destroyAll(managementContext);
+        if (objectStore != null) objectStore.deleteCompletely();
+    }
+    
+    // The web-console could still be polling (e.g. if have just restarted brooklyn), before the persister is set.
+    // Must not throw NPE, but instead return something sensible (e.g. an empty state record).
+    @Test
+    public void testGetManagementPlaneSyncStateDoesNotThrowNpeBeforePersisterSet() throws Exception {
+        HighAvailabilityManagerImpl manager2 = new HighAvailabilityManagerImpl(managementContext)
+            .setPollPeriod(Duration.millis(10))
+            .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+            .setPromotionListener(promotionListener)
+            .setLocalTicker(ticker)
+            .setRemoteTicker(ticker);
+        try {
+            ManagementPlaneSyncRecord state = manager2.loadManagementPlaneSyncRecord(true);
+            assertNotNull(state);
+        } finally {
+            manager2.stop();
+        }
+
+    }
+    // Can get a log.error about our management node's heartbeat being out of date. Caused by
+    // poller first writing a heartbeat record, and then the clock being incremented. But the
+    // next poll fixes it.
+    public void testPromotes() throws Exception {
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        
+        // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish
+        // its own heartbeat with the new time; but node1's record is now out-of-date.
+        tickerAdvance(Duration.seconds(31));
+        
+        // Expect to be notified of our promotion, as the only other node
+        promotionListener.assertCalledEventually();
+    }
+
+    @Test(groups="Integration") // because one second wait in succeedsContinually
+    public void testDoesNotPromoteIfMasterTimeoutNotExpired() throws Exception {
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        
+        tickerAdvance(Duration.seconds(25));
+        
+        // Expect not to be notified, as 25s < 30s timeout
+        // (it's normally a fake clock so won't hit 30, even waiting 1s below - but in "IntegrationTest" subclasses it is real!)
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(promotionListener.callTimestamps.isEmpty(), "calls="+promotionListener.callTimestamps);
+            }});
+    }
+
+    public void testGetManagementPlaneStatus() throws Exception {
+        // with the name zzzzz the mgr created here should never be promoted by the alphabetical strategy!
+        
+        tickerAdvance(Duration.FIVE_SECONDS);
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, ManagementNodeState.STANDBY))
+                .node(newManagerMemento("zzzzzzz_node1", ManagementNodeState.STANDBY))
+                .build());
+        persister.loadSyncRecord();
+        long zzzTime = tickerCurrentMillis();
+        tickerAdvance(Duration.FIVE_SECONDS);
+        
+        manager.start(HighAvailabilityMode.AUTO);
+        ManagementPlaneSyncRecord memento = manager.loadManagementPlaneSyncRecord(true);
+        
+        // Note can assert timestamp because not "real" time; it's using our own Ticker
+        assertEquals(memento.getMasterNodeId(), ownNodeId);
+        assertEquals(memento.getManagementNodes().keySet(), ImmutableSet.of(ownNodeId, "zzzzzzz_node1"));
+        assertEquals(memento.getManagementNodes().get(ownNodeId).getNodeId(), ownNodeId);
+        assertEquals(memento.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.MASTER);
+        assertEquals(memento.getManagementNodes().get(ownNodeId).getLocalTimestamp(), tickerCurrentMillis());
+        assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getNodeId(), "zzzzzzz_node1");
+        assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getStatus(), ManagementNodeState.STANDBY);
+        assertEquals(memento.getManagementNodes().get("zzzzzzz_node1").getLocalTimestamp(), zzzTime);
+    }
+
+    @Test(groups="Integration", invocationCount=50) //because we have had non-deterministic failures 
+    public void testGetManagementPlaneStatusManyTimes() throws Exception {
+        testGetManagementPlaneStatus();
+    }
+    
+    @Test
+    public void testGetManagementPlaneSyncStateInfersTimedOutNodeAsFailed() throws Exception {
+        persister.delta(ManagementPlaneSyncRecordDeltaImpl.builder()
+                .node(newManagerMemento(ownNodeId, ManagementNodeState.HOT_STANDBY))
+                .node(newManagerMemento("node1", ManagementNodeState.MASTER))
+                .setMaster("node1")
+                .build());
+        
+        manager.start(HighAvailabilityMode.HOT_STANDBY);
+        
+        ManagementPlaneSyncRecord state = manager.loadManagementPlaneSyncRecord(true);
+        assertEquals(state.getManagementNodes().get("node1").getStatus(), ManagementNodeState.MASTER);
+        assertEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.HOT_STANDBY);
+        
+        // Simulate passage of time; ticker used by this HA-manager so it will "correctly" publish
+        // its own heartbeat with the new time; but node1's record is now out-of-date.
+        tickerAdvance(Duration.seconds(31));
+        
+        ManagementPlaneSyncRecord state2 = manager.loadManagementPlaneSyncRecord(true);
+        assertEquals(state2.getManagementNodes().get("node1").getStatus(), ManagementNodeState.FAILED);
+        assertNotEquals(state.getManagementNodes().get(ownNodeId).getStatus(), ManagementNodeState.FAILED);
+    }
+
+    protected Duration getPollPeriod() {
+        return Duration.millis(10);
+    }
+    
+    protected long tickerCurrentMillis() {
+        return ticker.read();
+    }
+    
+    protected long tickerAdvance(Duration duration) {
+        currentTime.addAndGet(duration.toMilliseconds());
+        return tickerCurrentMillis();
+    }
+
+    protected Ticker getRemoteTicker() {
+        return ticker;
+    }
+    
+    protected ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status) {
+        Builder rb = BasicManagementNodeSyncRecord.builder();
+        rb.brooklynVersion(BrooklynVersion.get()).nodeId(nodeId).status(status);
+        rb.localTimestamp(tickerCurrentMillis());
+        if (getRemoteTicker()!=null)
+            rb.remoteTimestamp(getRemoteTicker().read());
+        return rb.build();
+    }
+    
+    public static class RecordingPromotionListener implements PromotionListener {
+        public final List<Long> callTimestamps = Lists.newCopyOnWriteArrayList();
+        
+        @Override
+        public void promotingToMaster() {
+            callTimestamps.add(System.currentTimeMillis());
+        }
+        
+        public void assertNotCalled() {
+            assertTrue(callTimestamps.isEmpty(), "calls="+callTimestamps);
+        }
+
+        public void assertCalled() {
+            assertFalse(callTimestamps.isEmpty(), "calls="+callTimestamps);
+        }
+
+        public void assertCalledEventually() {
+            Asserts.succeedsEventually(new Runnable() {
+                @Override public void run() {
+                    assertCalled();
+                }});
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
new file mode 100644
index 0000000..da4f998
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/HotStandbyTest.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.util.ArrayDeque;
+import java.util.Date;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Application;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Feed;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.management.internal.AbstractManagementContext;
+import org.apache.brooklyn.core.management.internal.ManagementContextInternal;
+import org.apache.brooklyn.test.EntityTestUtils;
+import org.apache.brooklyn.test.entity.LocalManagementContextForTests;
+import org.apache.brooklyn.test.entity.TestApplication;
+import org.apache.brooklyn.test.entity.TestEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl;
+import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.entity.rebind.RebindTestFixture;
+import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.entity.rebind.persister.ListeningObjectStore;
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore;
+
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation.LocalhostMachine;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.text.ByteSizeStrings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.collect.Iterables;
+
+public class HotStandbyTest {
+
+    private static final Logger log = LoggerFactory.getLogger(HotStandbyTest.class);
+    
+    private List<HaMgmtNode> nodes = new MutableList<HotStandbyTest.HaMgmtNode>();
+    Map<String,String> sharedBackingStore = MutableMap.of();
+    Map<String,Date> sharedBackingStoreDates = MutableMap.of();
+    private ClassLoader classLoader = getClass().getClassLoader();
+    
+    public class HaMgmtNode {
+        // TODO share with WarmStandbyTest and SplitBrainTest and a few others (minor differences but worth it ultimately)
+
+        private ManagementContextInternal mgmt;
+        private String ownNodeId;
+        private String nodeName;
+        private ListeningObjectStore objectStore;
+        private ManagementPlaneSyncRecordPersister persister;
+        private HighAvailabilityManagerImpl ha;
+        private Duration persistOrRebindPeriod = Duration.ONE_SECOND;
+
+        public void setUp() throws Exception {
+            nodeName = "node "+nodes.size();
+            mgmt = newLocalManagementContext();
+            ownNodeId = mgmt.getManagementNodeId();
+            objectStore = new ListeningObjectStore(newPersistenceObjectStore());
+            objectStore.injectManagementContext(mgmt);
+            objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
+            persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
+            ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).preferRemoteTimestampInMemento();
+            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
+            ((RebindManagerImpl)mgmt.getRebindManager()).setPeriodicPersistPeriod(persistOrRebindPeriod);
+            mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
+            ha = ((HighAvailabilityManagerImpl)mgmt.getHighAvailabilityManager())
+                .setPollPeriod(Duration.PRACTICALLY_FOREVER)
+                .setHeartbeatTimeout(Duration.THIRTY_SECONDS)
+                .setPersister(persister);
+            log.info("Created "+nodeName+" "+ownNodeId);
+        }
+        
+        public void tearDownThisOnly() throws Exception {
+            if (ha != null) ha.stop();
+            if (mgmt!=null) mgmt.getRebindManager().stop();
+            if (mgmt != null) Entities.destroyAll(mgmt);
+        }
+        
+        public void tearDownAll() throws Exception {
+            tearDownThisOnly();
+            // can't delete the object store until all being torn down
+            if (objectStore != null) objectStore.deleteCompletely();
+        }
+        
+        @Override
+        public String toString() {
+            return nodeName+" "+ownNodeId;
+        }
+
+        public RebindManagerImpl rebinder() {
+            return (RebindManagerImpl)mgmt.getRebindManager();
+        }
+    }
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        nodes.clear();
+        sharedBackingStore.clear();
+    }
+    
+    public HaMgmtNode newNode(Duration persistOrRebindPeriod) throws Exception {
+        HaMgmtNode node = new HaMgmtNode();
+        node.persistOrRebindPeriod = persistOrRebindPeriod;
+        node.setUp();
+        nodes.add(node);
+        return node;
+    }
+
+    @AfterMethod(alwaysRun=true)
+    public void tearDown() throws Exception {
+        for (HaMgmtNode n: nodes)
+            n.tearDownAll();
+    }
+
+    protected ManagementContextInternal newLocalManagementContext() {
+        return new LocalManagementContextForTests();
+    }
+
+    protected PersistenceObjectStore newPersistenceObjectStore() {
+        return new InMemoryObjectStore(sharedBackingStore, sharedBackingStoreDates);
+    }
+
+    private HaMgmtNode createMaster(Duration persistOrRebindPeriod) throws Exception {
+        HaMgmtNode n1 = newNode(persistOrRebindPeriod);
+        n1.ha.start(HighAvailabilityMode.AUTO);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+        return n1;
+    }
+    
+    private HaMgmtNode createHotStandby(Duration rebindPeriod) throws Exception {
+        HaMgmtNode n2 = newNode(rebindPeriod);
+        n2.ha.start(HighAvailabilityMode.HOT_STANDBY);
+        assertEquals(n2.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+        return n2;
+    }
+
+    private TestApplication createFirstAppAndPersist(HaMgmtNode n1) throws Exception {
+        TestApplication app = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        // for testing without enrichers, if desired:
+//        TestApplication app = ApplicationBuilder.newManagedApp(EntitySpec.create(TestApplication.class).impl(TestApplicationNoEnrichersImpl.class), n1.mgmt);
+        app.setDisplayName("First App");
+        app.start(MutableList.<Location>of());
+        app.config().set(TestEntity.CONF_NAME, "first-app");
+        app.setAttribute(TestEntity.SEQUENCE, 3);
+        
+        forcePersistNow(n1);
+        return app;
+    }
+
+    protected void forcePersistNow(HaMgmtNode n1) {
+        n1.mgmt.getRebindManager().forcePersistNow(false, null);
+    }
+    
+    private Application expectRebindSequenceNumber(HaMgmtNode master, HaMgmtNode hotStandby, Application app, int expectedSensorSequenceValue, boolean immediate) {
+        Application appRO = hotStandby.mgmt.lookup(app.getId(), Application.class);
+
+        if (immediate) {
+            forcePersistNow(master);
+            forceRebindNow(hotStandby);
+            EntityTestUtils.assertAttributeEquals(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue);
+        } else {
+            EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, expectedSensorSequenceValue);
+        }
+        
+        log.info("got sequence number "+expectedSensorSequenceValue+" from "+appRO);
+        
+        // make sure the instance (proxy) is unchanged
+        Application appRO2 = hotStandby.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertTrue(appRO2==appRO);
+        
+        return appRO;
+    }
+
+    private void forceRebindNow(HaMgmtNode hotStandby) {
+        hotStandby.mgmt.getRebindManager().rebind(null, null, ManagementNodeState.HOT_STANDBY);
+    }
+    
+    @Test
+    public void testHotStandbySeesInitialCustomNameConfigAndSensorValueButDoesntAllowChange() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        Assert.assertTrue(appRO instanceof TestApplication);
+        assertEquals(appRO.getDisplayName(), "First App");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app");
+        assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+
+        try {
+            ((TestApplication)appRO).setAttribute(TestEntity.SEQUENCE, 4);
+            Assert.fail("Should not have allowed sensor to be set");
+        } catch (Exception e) {
+            Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e);
+        }
+        assertEquals(appRO.getAttribute(TestEntity.SEQUENCE), (Integer)3);
+    }
+
+    @Test
+    public void testHotStandbySeesChangesToNameConfigAndSensorValue() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        assertEquals(appRO.getChildren().size(), 0);
+        
+        // test changes
+
+        app.setDisplayName("First App Renamed");
+        app.config().set(TestEntity.CONF_NAME, "first-app-renamed");
+        app.setAttribute(TestEntity.SEQUENCE, 4);
+
+        appRO = expectRebindSequenceNumber(n1, n2, app, 4, true);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getDisplayName(), "First App Renamed");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-renamed");
+        
+        // and change again for good measure!
+
+        app.setDisplayName("First App");
+        app.config().set(TestEntity.CONF_NAME, "first-app-restored");
+        app.setAttribute(TestEntity.SEQUENCE, 5);
+        
+        appRO = expectRebindSequenceNumber(n1, n2, app, 5, true);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getDisplayName(), "First App");
+        assertEquals(appRO.getConfig(TestEntity.CONF_NAME), "first-app-restored");
+    }
+
+
+    public void testHotStandbySeesStructuralChangesIncludingRemoval() throws Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+    }
+    
+    @Test(groups="Integration") // due to time (it waits for background persistence)
+    public void testHotStandbyUnforcedSeesStructuralChangesIncludingRemoval() throws Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(false);
+    }
+    
+    public void doTestHotStandbySeesStructuralChangesIncludingRemoval(boolean immediate) throws Exception {
+        HaMgmtNode n1 = createMaster(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200));
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(immediate ? Duration.PRACTICALLY_FOREVER : Duration.millis(200));
+
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Application appRO = n2.mgmt.lookup(app.getId(), Application.class);
+        Assert.assertNotNull(appRO);
+        assertEquals(appRO.getChildren().size(), 0);
+        
+        // test additions - new child, new app
+        
+        TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+        Entities.manage(child);
+        TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        app2.config().set(TestEntity.CONF_NAME, "second-app");
+        
+        app.setAttribute(TestEntity.SEQUENCE, 4);
+        appRO = expectRebindSequenceNumber(n1, n2, app, 4, immediate);
+        
+        assertEquals(appRO.getChildren().size(), 1);
+        Entity childRO = Iterables.getOnlyElement(appRO.getChildren());
+        assertEquals(childRO.getId(), child.getId());
+        assertEquals(childRO.getConfig(TestEntity.CONF_NAME), "first-child");
+        
+        assertEquals(n2.mgmt.getApplications().size(), 2);
+        Application app2RO = n2.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2RO);
+        assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+        
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 3);
+        
+        // now test removals
+        
+        Entities.unmanage(child);
+        Entities.unmanage(app2);
+        
+        app.setAttribute(TestEntity.SEQUENCE, 5);
+        appRO = expectRebindSequenceNumber(n1, n2, app, 5, immediate);
+        
+        EntityTestUtils.assertAttributeEqualsEventually(appRO, TestEntity.SEQUENCE, 5);
+        assertEquals(n2.mgmt.getEntityManager().getEntities().size(), 1);
+        assertEquals(appRO.getChildren().size(), 0);
+        assertEquals(n2.mgmt.getApplications().size(), 1);
+        Assert.assertNull(n2.mgmt.lookup(app2.getId(), Application.class));
+        Assert.assertNull(n2.mgmt.lookup(child.getId(), Application.class));
+    }
+
+    @Test(groups="Integration", invocationCount=50)
+    public void testHotStandbySeesStructuralChangesIncludingRemovalManyTimes() throws Exception {
+        doTestHotStandbySeesStructuralChangesIncludingRemoval(true);
+    }
+
+    Deque<Long> usedMemory = new ArrayDeque<Long>();
+    protected long noteUsedMemory(String message) {
+        Time.sleep(Duration.millis(200));
+        for (HaMgmtNode n: nodes) {
+            ((AbstractManagementContext)n.mgmt).getGarbageCollector().gcIteration();
+        }
+        System.gc(); System.gc();
+        Time.sleep(Duration.millis(50));
+        System.gc(); System.gc();
+        long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+        usedMemory.addLast(mem);
+        log.info("Memory used - "+message+": "+ByteSizeStrings.java().apply(mem));
+        return mem;
+    }
+    public void assertUsedMemoryLessThan(String event, long max) {
+        noteUsedMemory(event);
+        long nowUsed = usedMemory.peekLast();
+        if (nowUsed > max) {
+            // aggressively try to force GC
+            Time.sleep(Duration.ONE_SECOND);
+            usedMemory.removeLast();
+            noteUsedMemory(event+" (extra GC)");
+            nowUsed = usedMemory.peekLast();
+            if (nowUsed > max) {
+                Assert.fail("Too much memory used - "+ByteSizeStrings.java().apply(nowUsed)+" > max "+ByteSizeStrings.java().apply(max));
+            }
+        }
+    }
+    public void assertUsedMemoryMaxDelta(String event, long deltaMegabytes) {
+        assertUsedMemoryLessThan(event, usedMemory.peekLast() + deltaMegabytes*1024*1024);
+    }
+
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotLeakLotsOfRebinds() throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int DELTA = 2;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        long initialUsed = noteUsedMemory("Created app");
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        assertUsedMemoryMaxDelta("Standby created", DELTA);
+        
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Persisted and rebinded once", DELTA);
+        
+        for (int i=0; i<10; i++) {
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 10x", DELTA);
+        
+        for (int i=0; i<1000; i++) {
+            if ((i+1)%100==0) {
+                noteUsedMemory("iteration "+(i+1));
+                usedMemory.removeLast();
+            }
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000);
+    }
+
+    static class BigObject {
+        public BigObject(int sizeBytes) { array = new byte[sizeBytes]; }
+        byte[] array;
+    }
+    
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotLeakBigObjects() throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int SIZE = 5;
+        final int SIZE_UP_BOUND = SIZE+2;
+        final int SIZE_DOWN_BOUND = SIZE-1;
+        final int GRACE = 2;
+        // the XML persistence uses a lot of space, we approx at between 2x and 3c
+        final int SIZE_IN_XML = 3*SIZE;
+        final int SIZE_IN_XML_DOWN = 2*SIZE;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);        
+        noteUsedMemory("Finished seeding");
+        Long initialUsed = usedMemory.peekLast();
+        app.config().set(TestEntity.CONF_OBJECT, new BigObject(SIZE*1000*1000));
+        assertUsedMemoryMaxDelta("Set a big config object", SIZE_UP_BOUND);
+        forcePersistNow(n1);
+        assertUsedMemoryMaxDelta("Persisted a big config object", SIZE_IN_XML);
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Rebinded", SIZE_UP_BOUND);
+        
+        for (int i=0; i<10; i++)
+            forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Several more rebinds", GRACE);
+        for (int i=0; i<10; i++) {
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("And more rebinds and more persists", GRACE);
+        
+        app.config().set(TestEntity.CONF_OBJECT, "big is now small");
+        assertUsedMemoryMaxDelta("Big made small at primary", -SIZE_DOWN_BOUND);
+        forcePersistNow(n1);
+        assertUsedMemoryMaxDelta("And persisted", -SIZE_IN_XML_DOWN);
+        
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("And at secondary", -SIZE_DOWN_BOUND);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", initialUsed+GRACE*1000*1000);
+    }
+
+    @Test(groups="Integration") // because it's slow
+    // Sept 2014 - there is a small leak, of 200 bytes per child created and destroyed;
+    // but this goes away when the app is destroyed; it may be a benign record
+    public void testHotStandbyDoesNotLeakLotsOfRebindsCreatingAndDestroyingAChildEntity() throws Exception {
+        log.info("Starting test "+JavaClassNames.niceClassAndMethod());
+        final int DELTA = 2;
+        
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        long initialUsed = noteUsedMemory("Created app");
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        assertUsedMemoryMaxDelta("Standby created", DELTA);
+        
+        TestEntity lastChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+        Entities.manage(lastChild);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        assertUsedMemoryMaxDelta("Child created and rebinded once", DELTA);
+        
+        for (int i=0; i<1000; i++) {
+            if (i==9 || (i+1)%100==0) {
+                noteUsedMemory("iteration "+(i+1));
+                usedMemory.removeLast();
+            }
+            TestEntity newChild = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+            Entities.manage(newChild);
+            Entities.unmanage(lastChild);
+            lastChild = newChild;
+            
+            forcePersistNow(n1);
+            forceRebindNow(n2);
+        }
+        assertUsedMemoryMaxDelta("Persisted and rebinded 1000x", DELTA);
+        
+        Entities.unmanage(app);
+        forcePersistNow(n1);
+        forceRebindNow(n2);
+        
+        assertUsedMemoryLessThan("And now all unmanaged", initialUsed + DELTA*1000*1000);
+    }
+    
+    protected void assertHotStandby(HaMgmtNode n1) {
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.HOT_STANDBY);
+        Assert.assertTrue(n1.rebinder().isReadOnlyRunning());
+        Assert.assertFalse(n1.rebinder().isPersistenceRunning());
+    }
+
+    protected void assertMaster(HaMgmtNode n1) {
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.MASTER);
+        Assert.assertFalse(n1.rebinder().isReadOnlyRunning());
+        Assert.assertTrue(n1.rebinder().isPersistenceRunning());
+    }
+
+    @Test
+    public void testChangeMode() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+
+        TestEntity child = app.addChild(EntitySpec.create(TestEntity.class).configure(TestEntity.CONF_NAME, "first-child"));
+        Entities.manage(child);
+        TestApplication app2 = TestApplication.Factory.newManagedInstanceForTests(n1.mgmt);
+        app2.config().set(TestEntity.CONF_NAME, "second-app");
+
+        forcePersistNow(n1);
+        n2.ha.setPriority(1);
+        n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+        
+        // both now hot standby
+        assertHotStandby(n1);
+        assertHotStandby(n2);
+        
+        assertEquals(n1.mgmt.getApplications().size(), 2);
+        Application app2RO = n1.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2RO);
+        assertEquals(app2RO.getConfig(TestEntity.CONF_NAME), "second-app");
+        try {
+            ((TestApplication)app2RO).setAttribute(TestEntity.SEQUENCE, 4);
+            Assert.fail("Should not have allowed sensor to be set");
+        } catch (Exception e) {
+            Assert.assertTrue(e.toString().toLowerCase().contains("read-only"), "Error message did not contain expected text: "+e);
+        }
+
+        n1.ha.changeMode(HighAvailabilityMode.AUTO);
+        n2.ha.changeMode(HighAvailabilityMode.HOT_STANDBY, true, false);
+        // both still hot standby (n1 will defer to n2 as it has higher priority)
+        assertHotStandby(n1);
+        assertHotStandby(n2);
+        
+        // with priority 1, n2 will now be elected
+        n2.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertHotStandby(n1);
+        assertMaster(n2);
+        
+        assertEquals(n2.mgmt.getApplications().size(), 2);
+        Application app2B = n2.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2B);
+        assertEquals(app2B.getConfig(TestEntity.CONF_NAME), "second-app");
+        ((TestApplication)app2B).setAttribute(TestEntity.SEQUENCE, 4);
+        
+        forcePersistNow(n2);
+        forceRebindNow(n1);
+        Application app2BRO = n1.mgmt.lookup(app2.getId(), Application.class);
+        Assert.assertNotNull(app2BRO);
+        EntityTestUtils.assertAttributeEquals(app2BRO, TestEntity.SEQUENCE, 4);
+    }
+
+    @Test(groups="Integration", invocationCount=20)
+    public void testChangeModeManyTimes() throws Exception {
+        testChangeMode();
+    }
+
+    @Test
+    public void testChangeModeToDisabledAndBack() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        n1.mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachine.class));
+        @SuppressWarnings("unused")
+        TestApplication app = createFirstAppAndPersist(n1);
+        
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        
+        // disabled n1 allows n2 to become master when next we tell it to check
+        n1.ha.changeMode(HighAvailabilityMode.DISABLED);
+        n2.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertMaster(n2);
+        assertEquals(n1.ha.getNodeState(), ManagementNodeState.FAILED);
+        Assert.assertTrue(n1.mgmt.getApplications().isEmpty(), "n1 should have had no apps; instead had: "+n1.mgmt.getApplications());
+        Assert.assertTrue(n1.mgmt.getEntityManager().getEntities().isEmpty(), "n1 should have had no entities; instead had: "+n1.mgmt.getEntityManager().getEntities());
+        Assert.assertTrue(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had no locations; instead had: "+n1.mgmt.getLocationManager().getLocations());
+        
+        // we can now change n1 back to hot_standby
+        n1.ha.changeMode(HighAvailabilityMode.HOT_STANDBY);
+        assertHotStandby(n1);
+        // and it sees apps
+        Assert.assertFalse(n1.mgmt.getApplications().isEmpty(), "n1 should have had apps now");
+        Assert.assertFalse(n1.mgmt.getLocationManager().getLocations().isEmpty(), "n1 should have had locations now");
+        // and if n2 is disabled, n1 promotes
+        n2.ha.changeMode(HighAvailabilityMode.DISABLED);
+        n1.ha.changeMode(HighAvailabilityMode.AUTO);
+        assertMaster(n1);
+        assertEquals(n2.ha.getNodeState(), ManagementNodeState.FAILED);
+    }
+    
+    @Test
+    public void testHotStandbyDoesNotStartFeeds() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class));
+        forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
+        for (Feed feed : entity.feeds().getFeeds()) {
+            assertTrue(feed.isRunning(), "Feed expected running, but it is non-running");
+        }
+
+        HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
+        TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), Entity.class);
+        Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
+        for (Feed feedRO : entityRO.feeds().getFeeds()) {
+            assertFalse(feedRO.isRunning(), "Feed expected non-active, but it is running");
+        }
+    }
+    
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws Exception {
+        testHotStandbyDoesNotStartFeeds();
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class));
+        forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: "+entity.feeds().getFeeds());
+        
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..2f08057
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ImmutableManagementPlaneSyncRecord.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+public class ImmutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord {
+    private final String masterNodeId;
+    private final Map<String, ManagementNodeSyncRecord> managementNodes;
+
+    ImmutableManagementPlaneSyncRecord(String masterNodeId, Map<String, ManagementNodeSyncRecord> nodes) {
+        this.masterNodeId = masterNodeId;
+        this.managementNodes = ImmutableMap.copyOf(nodes);
+    }
+    
+    @Override
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    @Override
+    public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+        return managementNodes;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes.keySet()).toString();
+    }
+    
+    @Override
+    public String toVerboseString() {
+        return Objects.toStringHelper(this).add("master", masterNodeId).add("nodes", managementNodes).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
new file mode 100644
index 0000000..5791c88
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/ManagementPlaneSyncRecordPersisterInMemory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecordPersister;
+import org.apache.brooklyn.core.management.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.rebind.persister.InMemoryObjectStore;
+import brooklyn.util.time.Duration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** @deprecated since 0.7.0 use {@link ManagementPlaneSyncRecordPersisterToObjectStore}
+ * with {@link InMemoryObjectStore}
+ * <code>
+ * new ManagementPlaneSyncRecordPersisterToObjectStore(new InMemoryObjectStore(), classLoader)
+ * </code> */
+@Deprecated
+public class ManagementPlaneSyncRecordPersisterInMemory implements ManagementPlaneSyncRecordPersister {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ManagementPlaneSyncRecordPersisterInMemory.class);
+
+    private final MutableManagementPlaneSyncRecord memento = new MutableManagementPlaneSyncRecord();
+    
+    private volatile boolean running = true;
+    
+    @Override
+    public synchronized void stop() {
+        running = false;
+    }
+
+    @Override
+    public ManagementPlaneSyncRecord loadSyncRecord() throws IOException {
+        if (!running) {
+            throw new IllegalStateException("Persister not running; cannot load memento");
+        }
+        
+        return memento.snapshot();
+    }
+    
+    @VisibleForTesting
+    @Override
+    public synchronized void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+        // The synchronized is sufficient - guarantee that no concurrent calls
+        return;
+    }
+
+    @Override
+    public synchronized void delta(Delta delta) {
+        if (!running) {
+            if (LOG.isDebugEnabled()) LOG.debug("Persister not running; ignoring checkpointed delta of manager-memento");
+            return;
+        }
+        
+        for (ManagementNodeSyncRecord m : delta.getNodes()) {
+            memento.addNode(m);
+        }
+        for (String id : delta.getRemovedNodeIds()) {
+            memento.deleteNode(id);
+        }
+        switch (delta.getMasterChange()) {
+        case NO_CHANGE:
+            break; // no-op
+        case SET_MASTER:
+            memento.setMasterNodeId(checkNotNull(delta.getNewMasterOrNull()));
+            break;
+        case CLEAR_MASTER:
+            memento.setMasterNodeId(null);
+            break; // no-op
+        default:
+            throw new IllegalStateException("Unknown state for master-change: "+delta.getMasterChange());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
new file mode 100644
index 0000000..8958c4b
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MasterChooserTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeState;
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.core.management.ha.BasicMasterChooser.AlphabeticMasterChooser;
+import org.apache.brooklyn.core.management.ha.BasicMasterChooser.ScoredRecord;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.EntityFunctions;
+import brooklyn.entity.rebind.plane.dto.BasicManagementNodeSyncRecord;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class MasterChooserTest {
+
+    private MutableManagementPlaneSyncRecord memento;
+    private AlphabeticMasterChooser chooser;
+    private long now;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        memento = new MutableManagementPlaneSyncRecord();
+        chooser = new AlphabeticMasterChooser();
+        now = System.currentTimeMillis();
+    }
+    
+    @Test
+    public void testChoosesFirstAlphanumeric() throws Exception {
+        memento.addNode(newManagerMemento("node1",  ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node2",  ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node3",  ManagementNodeState.STANDBY, now));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        String ownNodeId = "node2";
+        assertEquals(chooser.choose(memento, heartbeatTimeout, ownNodeId).getNodeId(), "node1");
+    }
+    
+    @Test
+    public void testReturnsNullIfNoValid() throws Exception {
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        assertNull(chooser.choose(memento, heartbeatTimeout, "node2"));
+    }
+    
+    @Test
+    public void testFiltersOutByHeartbeat() throws Exception {
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now - 31*1000));
+        memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now - 20*1000));
+        memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now));
+        Duration heartbeatTimeout = Duration.THIRTY_SECONDS;
+        assertEquals(getIds(chooser.sort(chooser.filterHealthy(memento, heartbeatTimeout, now))), ImmutableList.of("node2", "node3"));
+    }
+    
+    protected static List<String> getIds(List<ScoredRecord<?>> filterHealthy) {
+        return ImmutableList.copyOf(Iterables.transform(filterHealthy, EntityFunctions.id()));
+    }
+
+    @Test
+    public void testFiltersOutByStatusNotPreferringMaster() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(false, false), ImmutableList.of("node4", "node5"));
+    }
+    @Test
+    public void testFiltersOutByStatusPreferringMaster() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(true, false), ImmutableList.of("node5", "node4"));
+    }
+    
+    @Test
+    public void testFiltersOutByStatusNotPreferringHot() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(false, true), ImmutableList.of("node4", "node5", "node6"));
+    }
+    @Test
+    public void testFiltersOutByStatusPreferringHot() throws Exception {
+        assertEquals(doTestFiltersOutByStatus(true, true), ImmutableList.of("node5", "node6", "node4"));
+    }
+    
+    protected List<String> doTestFiltersOutByStatus(boolean preferHot, boolean includeHot) throws Exception {
+        chooser = new AlphabeticMasterChooser(preferHot);
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.FAILED, now));
+        memento.addNode(newManagerMemento("node2", ManagementNodeState.TERMINATED, now));
+        memento.addNode(newManagerMemento("node3", null, now));
+        memento.addNode(newManagerMemento("node4",  ManagementNodeState.STANDBY, now));
+        memento.addNode(newManagerMemento("node5", ManagementNodeState.MASTER, now));
+        if (includeHot)
+            memento.addNode(newManagerMemento("node6",  ManagementNodeState.HOT_STANDBY, now));
+        return getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+    }
+
+    @Test
+    public void testExplicityPriority() throws Exception {
+        chooser = new AlphabeticMasterChooser();
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), 2L));
+        memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), -1L));
+        memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, BrooklynVersion.get(), null));
+        List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+        assertEquals(order, ImmutableList.of("node1", "node3", "node2"));
+    }
+
+    @Test
+    public void testVersionsMaybeNull() throws Exception {
+        chooser = new AlphabeticMasterChooser();
+        memento.addNode(newManagerMemento("node1", ManagementNodeState.STANDBY, now, "v10", null));
+        memento.addNode(newManagerMemento("node2", ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+        memento.addNode(newManagerMemento("node3", ManagementNodeState.STANDBY, now, "v3-snapshot", -1L));
+        memento.addNode(newManagerMemento("node4", ManagementNodeState.STANDBY, now, "v3-snapshot", null));
+        memento.addNode(newManagerMemento("node5", ManagementNodeState.STANDBY, now, "v3-stable", null));
+        memento.addNode(newManagerMemento("node6", ManagementNodeState.STANDBY, now, "v1", null));
+        memento.addNode(newManagerMemento("node7", ManagementNodeState.STANDBY, now, null, null));
+        List<String> order = getIds(chooser.sort(chooser.filterHealthy(memento, Duration.THIRTY_SECONDS, now)));
+        assertEquals(order, ImmutableList.of("node1", "node5", "node6", "node2", "node4", "node7", "node3"));
+    }
+
+    private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp) {
+        return newManagerMemento(nodeId, status, timestamp, BrooklynVersion.get(), null);
+    }
+    private ManagementNodeSyncRecord newManagerMemento(String nodeId, ManagementNodeState status, long timestamp,
+            String version, Long priority) {
+        return BasicManagementNodeSyncRecord.builder().brooklynVersion(version).nodeId(nodeId).status(status).localTimestamp(timestamp).remoteTimestamp(timestamp).
+            priority(priority).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
new file mode 100644
index 0000000..d9c8943
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/core/management/ha/MutableManagementPlaneSyncRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.management.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.management.ha.ManagementPlaneSyncRecord;
+
+import com.google.common.collect.Maps;
+
+public class MutableManagementPlaneSyncRecord implements ManagementPlaneSyncRecord {
+    private String masterNodeId;
+    private Map<String, ManagementNodeSyncRecord> managementNodes = Maps.newConcurrentMap();
+
+    @Override
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    @Override
+    public Map<String, ManagementNodeSyncRecord> getManagementNodes() {
+        return managementNodes;
+    }
+
+    @Override
+    public String toVerboseString() {
+        return toString();
+    }
+
+    public ImmutableManagementPlaneSyncRecord snapshot() {
+        return new ImmutableManagementPlaneSyncRecord(masterNodeId, managementNodes);
+    }
+    
+    public void setMasterNodeId(String masterNodeId) {
+        this.masterNodeId = masterNodeId;
+    }
+    
+    public void addNode(ManagementNodeSyncRecord memento) {
+        managementNodes.put(memento.getNodeId(), memento);
+    }
+    
+    public void deleteNode(String nodeId) {
+        managementNodes.remove(nodeId);
+    }
+}
\ No newline at end of file