You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2017/10/28 10:02:58 UTC

[4/7] brooklyn-server git commit: PR #867: avoid duplicate bundles in persistence

PR #867: avoid duplicate bundles in persistence

Previously, when the “initial catalog” bundles were persisted at
Brooklyn startup, we’d then read the existing persisted state bundles.
The latter would include duplicates of the “initial catalog”. The
persisted state would therefore get duplicates every time Brooklyn
started.

This fixes it by deleted from persisted state any bundles that were
“duplicates” of existing brooklyn-managed-bundles.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/5d13ebfb
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/5d13ebfb
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/5d13ebfb

Branch: refs/heads/master
Commit: 5d13ebfb78982f257445d81ec327f225da562e20
Parents: 8519689
Author: Aled Sage <al...@gmail.com>
Authored: Thu Oct 26 23:00:03 2017 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Oct 27 13:50:11 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/rebind/RebindManager.java |   9 +-
 .../catalog/internal/CatalogInitialization.java |  37 +++++-
 .../mgmt/ha/HighAvailabilityManagerImpl.java    |  13 +-
 .../brooklyn/core/mgmt/ha/OsgiManager.java      |  18 +++
 .../NonDeploymentManagementContext.java         |   5 +
 .../rebind/PeriodicDeltaChangeListener.java     |  19 ++-
 .../core/mgmt/rebind/RebindManagerImpl.java     |   5 +
 .../core/typereg/BasicManagedBundle.java        |   3 +
 .../AbstractBrooklynLauncherRebindTest.java     |  28 +++-
 .../BrooklynLauncherRebindCatalogOsgiTest.java  | 132 +++++++++++++++++++
 10 files changed, 259 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
index e87a577..ecf56c2 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
@@ -89,7 +89,14 @@ public interface RebindManager {
     /** Stops the background reading (mirroring) of state. 
      * Interrupts any current activity and waits for it to cease. */
     public void stopReadOnly();
-    
+
+    /**
+     * Resets the effects of previously being read-only, ready to be used again (e.g. when promoting to master).
+     * Expected to be called after {@link #stopReadOnly()} (thus long after {@link #setPersister(BrooklynMementoPersister)}, 
+     * and before {@link #rebind(ClassLoader, RebindExceptionHandler, ManagementNodeState)} followed by {@link #start()}. 
+     */
+    public void reset();
+
     /** Starts the appropriate background processes, {@link #startPersistence()} if {@link ManagementNodeState#MASTER},
      * {@link #startReadOnly()} if {@link ManagementNodeState#HOT_STANDBY} or {@link ManagementNodeState#HOT_BACKUP} */
     public void start();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogInitialization.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogInitialization.java b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogInitialization.java
index 225e3fd..ff9219a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogInitialization.java
+++ b/core/src/main/java/org/apache/brooklyn/core/catalog/internal/CatalogInitialization.java
@@ -199,6 +199,20 @@ public class CatalogInitialization implements ManagementContextInjectable {
     }
 
     /**
+     * Clears all record of the brooklyn-managed-bundles (so use with care!).
+     * 
+     * Used when promoting from HOT_STANDBY to MASTER. Previous actions performed as HOT_STANDBY
+     * will have been done in read-only mode. When we rebind in anger as master, we want to do this
+     * without a previous cache of managed bundles.
+     */
+    public void clearBrooklynManagedBundles() {
+        Maybe<OsgiManager> osgiManager = managementContext.getOsgiManager();
+        if (osgiManager.isPresent()) {
+            osgiManager.get().clearManagedBundles();
+        }
+    }
+    
+    /**
      * Adds the given persisted catalog items.
      * 
      * Can be called multiple times, e.g.:
@@ -381,7 +395,7 @@ public class CatalogInitialization implements ManagementContextInjectable {
 
         try {
             // Always installing the bundles from persisted state
-            installBundles(persistedState.getBundles(), exceptionHandler, rebindLogger);
+            installPersistedBundles(persistedState.getBundles(), exceptionHandler, rebindLogger);
             
             BrooklynCatalog catalog = managementContext.getCatalog();
             catalog.addCatalogLegacyItemsOnRebind(persistedState.getLegacyCatalogItems());
@@ -453,8 +467,8 @@ public class CatalogInitialization implements ManagementContextInjectable {
         }
     }
 
-    private void installBundles(Map<VersionedName, InstallableManagedBundle> bundles, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
-        List<OsgiBundleInstallationResult> installs = MutableList.of();
+    private void installPersistedBundles(Map<VersionedName, InstallableManagedBundle> bundles, RebindExceptionHandler exceptionHandler, RebindLogger rebindLogger) {
+        Map<InstallableManagedBundle, OsgiBundleInstallationResult> installs = MutableMap.of();
 
         // Install the bundles
         for (Map.Entry<VersionedName, InstallableManagedBundle> entry : bundles.entrySet()) {
@@ -462,7 +476,7 @@ public class CatalogInitialization implements ManagementContextInjectable {
             InstallableManagedBundle installableBundle = entry.getValue();
             rebindLogger.debug("RebindManager installing bundle {}", bundleId);
             try (InputStream in = installableBundle.getInputStream()) {
-                installs.add(installBundle(installableBundle.getManagedBundle(), in));
+                installs.put(installableBundle, installBundle(installableBundle.getManagedBundle(), in));
             } catch (Exception e) {
                 exceptionHandler.onCreateFailed(BrooklynObjectType.MANAGED_BUNDLE, bundleId.toString(), installableBundle.getManagedBundle().getSymbolicName(), e);
             }
@@ -470,7 +484,7 @@ public class CatalogInitialization implements ManagementContextInjectable {
         
         // Start the bundles (now that we've installed them all)
         Set<RegisteredType> installedTypes = MutableSet.of();
-        for (OsgiBundleInstallationResult br: installs) {
+        for (OsgiBundleInstallationResult br : installs.values()) {
             try {
                 startBundle(br);
                 Iterables.addAll(installedTypes, managementContext.getTypeRegistry().getMatching(
@@ -484,6 +498,19 @@ public class CatalogInitialization implements ManagementContextInjectable {
         if (!installedTypes.isEmpty()) {
             validateAllTypes(installedTypes, exceptionHandler);
         }
+        
+        for (Map.Entry<InstallableManagedBundle, OsgiBundleInstallationResult> entry : installs.entrySet()) {
+            ManagedBundle bundle = entry.getKey().getManagedBundle();
+            OsgiBundleInstallationResult result = entry.getValue();
+            if (result.getCode() == OsgiBundleInstallationResult.ResultCode.IGNORING_BUNDLE_AREADY_INSTALLED 
+                    && !result.getMetadata().getId().equals(bundle.getId())) {
+                // Bundle was already installed as a "Brooklyn managed bundle" (with different id), 
+                // and will thus be persisted with that id.
+                // For example, can happen if it is in the "initial catalog" and also in persisted state.
+                // Delete this copy from the persisted state as it is a duplicate.
+                managementContext.getRebindManager().getChangeListener().onUnmanaged(bundle);
+            }
+        }
     }
     
     private void validateAllTypes(Set<RegisteredType> installedTypes, RebindExceptionHandler exceptionHandler) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index cc8b645..dac64a4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -545,6 +545,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
             // TODO ideally there'd be an incremental rebind as well as an incremental persist
             managementContext.getRebindManager().stopReadOnly();
             clearManagedItems(ManagementTransitionMode.transitioning(BrooklynObjectManagementMode.LOADED_READ_ONLY, BrooklynObjectManagementMode.UNMANAGED_PERSISTED));
+            managementContext.getRebindManager().reset();
         }
         
         stateListener.onStateChange(getNodeState());
@@ -929,11 +930,19 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
         managementContext.getRebindManager().stopPersistence();
         managementContext.getRebindManager().stopReadOnly();
         clearManagedItems(mode);
+        managementContext.getRebindManager().reset();
         
         // tasks are cleared as part of unmanaging entities above
     }
 
-    /** clears all managed items from the management context; same items destroyed as in the course of a rebind cycle */
+    /** 
+     * Clears all managed items from the management context.
+     * 
+     * The same items are destroyed as in the course of a rebind cycle, except for clearBrooklynManagedBundles.
+     * That last operation could be expensive (causing bundles to be installed again). Therefore we only do it
+     * when we stop being a hotProxy or when we are demoted (e.g. during the periodic rebind as hot_stanby
+     * we will not repeatedly clear the brooklyn-managed-bundles).
+     */
     protected void clearManagedItems(ManagementTransitionMode mode) {
         // start with the root applications
         for (Application app: managementContext.getApplications()) {
@@ -957,6 +966,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager {
         }
         
         ((BasicBrooklynCatalog)managementContext.getCatalog()).reset(CatalogDto.newEmptyInstance("<reset-by-ha-status-change>"));
+        
+        managementContext.getCatalogInitialization().clearBrooklynManagedBundles();
     }
     
     /** Starts hot standby or hot backup, in foreground

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/OsgiManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/OsgiManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/OsgiManager.java
index 9f47a44..e7bd75e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/OsgiManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/OsgiManager.java
@@ -117,6 +117,13 @@ public class OsgiManager {
         private final Map<String, String> managedBundlesUidByUrl = MutableMap.of();
         private final Map<VersionedName,ManagedBundle> wrapperBundles = MutableMap.of();
         
+        synchronized void clear() {
+            managedBundlesByUid.clear();
+            managedBundlesUidByVersionedName.clear();
+            managedBundlesUidByUrl.clear();
+            wrapperBundles.clear();
+        }
+
         synchronized Map<String, ManagedBundle> getManagedBundles() {
             return ImmutableMap.copyOf(managedBundlesByUid);
         }
@@ -337,6 +344,17 @@ public class OsgiManager {
         return Maybe.absent();
     }
     
+    /**
+     * Clears all record of the managed bundles (use with care!).
+     * 
+     * Used when promoting from HOT_STANDBY to MASTER. Previous actions performed as HOT_STANDBY
+     * will have been done in read-only mode. When we rebind in anger as master, we want to do this
+     * without a previous cache of managed bundles.
+     */
+    public void clearManagedBundles() {
+        managedBundlesRecord.clear();
+    }
+
     /** Map of bundles by UID */
     public Map<String, ManagedBundle> getManagedBundles() {
         return managedBundlesRecord.getManagedBundles();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index a1ab3df..1fc705c 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -579,6 +579,11 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
         }
 
         @Override
+        public void reset() {
+            throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation.");
+        }
+
+        @Override
         public void start() {
             throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation.");
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
index a5af458..e4b2d7d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
@@ -283,6 +283,21 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
         }
     }
     
+    /**
+     * Resets persistence from STOPPED, back to the initial state of INIT.
+     * 
+     * Used when transitioning from HOT_STANDBY to MASTER. On rebinding as MASTER, we want it to 
+     * behave in the same way as it would from INIT (e.g. populating the deltaCollector, etc).
+     */
+    void reset() {
+        synchronized (startStopMutex) {
+            if (state != ListenerState.STOPPED) {
+                return;
+            }
+            state = ListenerState.INIT;
+        }
+    }
+
     /** Waits for any in-progress writes to be completed then for or any unwritten data to be written. */
     @VisibleForTesting
     public void waitForPendingComplete(Duration timeout, boolean canTrigger) throws InterruptedException, TimeoutException {
@@ -327,7 +342,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
      * Indicates whether persistence is active. 
      * Even when not active, changes will still be tracked unless {@link #isStopped()}.
      */
-    private boolean isActive() {
+    boolean isActive() {
         return state == ListenerState.RUNNING && persister != null && !isStopped();
     }
 
@@ -336,7 +351,7 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
      * in which case will not persist or store anything
      * (except for a final internal persistence called while STOPPING.) 
      */
-    private boolean isStopped() {
+    boolean isStopped() {
         return state == ListenerState.STOPPING || state == ListenerState.STOPPED || executionContext.isShutdown();
     }
     

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 0830900..8a096d4 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -364,6 +364,11 @@ public class RebindManagerImpl implements RebindManager {
             LOG.debug("Stopped read-only rebinding ("+this+"), mgmt "+managementContext.getManagementNodeId());
         }
     }
+
+    @Override
+    public void reset() {
+        if (persistenceRealChangeListener != null && !persistenceRealChangeListener.isActive()) persistenceRealChangeListener.reset();
+    }
     
     @Override
     public void start() {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/core/src/main/java/org/apache/brooklyn/core/typereg/BasicManagedBundle.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/typereg/BasicManagedBundle.java b/core/src/main/java/org/apache/brooklyn/core/typereg/BasicManagedBundle.java
index 860bf5a..e7489c9 100644
--- a/core/src/main/java/org/apache/brooklyn/core/typereg/BasicManagedBundle.java
+++ b/core/src/main/java/org/apache/brooklyn/core/typereg/BasicManagedBundle.java
@@ -145,6 +145,9 @@ public class BasicManagedBundle extends AbstractBrooklynObject implements Manage
             // this makes equality with other OsgiBundleWithUrl items symmetric,
             // but for two MB's we look additionally at checksum
             if (!Objects.equal(checksum, ((ManagedBundle)other).getChecksum())) return false;
+            
+            // only equal if have the same ManagedBundle uid; important for persistence.changeListener().unmanage()
+            if (!Objects.equal(getId(), ((ManagedBundle)other).getId())) return false;
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/launcher/src/test/java/org/apache/brooklyn/launcher/AbstractBrooklynLauncherRebindTest.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/brooklyn/launcher/AbstractBrooklynLauncherRebindTest.java b/launcher/src/test/java/org/apache/brooklyn/launcher/AbstractBrooklynLauncherRebindTest.java
index 400e3a8..474c72b 100644
--- a/launcher/src/test/java/org/apache/brooklyn/launcher/AbstractBrooklynLauncherRebindTest.java
+++ b/launcher/src/test/java/org/apache/brooklyn/launcher/AbstractBrooklynLauncherRebindTest.java
@@ -70,6 +70,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
@@ -205,10 +206,15 @@ public abstract class AbstractBrooklynLauncherRebindTest {
         Assert.assertTrue(compareIterablesWithoutOrderMatters(ids, idsFromItems), String.format("Expected %s, found %s", ids, idsFromItems));
     }
 
-    protected void assertManagedBundle(BrooklynLauncher launcher, VersionedName bundleId, Set<VersionedName> expectedCatalogItems) {
+    protected ManagedBundle findManagedBundle(BrooklynLauncher launcher, VersionedName bundleId) {
         ManagementContextInternal mgmt = (ManagementContextInternal)launcher.getManagementContext();
         ManagedBundle bundle = mgmt.getOsgiManager().get().getManagedBundle(bundleId);
         assertNotNull(bundle, bundleId+" not found");
+        return bundle;
+    }
+    
+    protected void assertManagedBundle(BrooklynLauncher launcher, VersionedName bundleId, Set<VersionedName> expectedCatalogItems) {
+        assertNotNull(findManagedBundle(launcher, bundleId), bundleId+" not found");
         
         Set<VersionedName> actualCatalogItems = new LinkedHashSet<>();
         Iterable<RegisteredType> types = launcher.getManagementContext().getTypeRegistry().getAll();
@@ -225,6 +231,26 @@ public abstract class AbstractBrooklynLauncherRebindTest {
         ManagedBundle bundle = mgmt.getOsgiManager().get().getManagedBundle(bundleId);
         assertNull(bundle, bundleId+" should not exist");
     }
+    
+    protected Set<String> getPersistenceListing(BrooklynObjectType type) throws Exception {
+        File persistedSubdir = getPersistanceSubdirectory(type);
+        return ImmutableSet.copyOf(persistedSubdir.list((dir,name) -> !name.endsWith(".jar")));
+    }
+
+    private File getPersistanceSubdirectory(BrooklynObjectType type) {
+        String dir;
+        switch (type) {
+            case ENTITY: dir = "entities"; break;
+            case LOCATION: dir = "locations"; break;
+            case POLICY: dir = "policies"; break;
+            case ENRICHER: dir = "enrichers"; break;
+            case FEED: dir = "feeds"; break;
+            case CATALOG_ITEM: dir = "catalog"; break;
+            case MANAGED_BUNDLE: dir = "bundles"; break;
+            default: throw new UnsupportedOperationException("type="+type);
+        }
+        return new File(persistenceDir, dir);
+    }
 
     private static <T> boolean compareIterablesWithoutOrderMatters(Iterable<T> a, Iterable<T> b) {
         List<T> aList = Lists.newArrayList(a);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/5d13ebfb/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindCatalogOsgiTest.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindCatalogOsgiTest.java b/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindCatalogOsgiTest.java
index 8985147..e953c8d 100644
--- a/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindCatalogOsgiTest.java
+++ b/launcher/src/test/java/org/apache/brooklyn/launcher/BrooklynLauncherRebindCatalogOsgiTest.java
@@ -29,11 +29,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
+import org.apache.brooklyn.api.objs.BrooklynObjectType;
 import org.apache.brooklyn.core.catalog.internal.BasicBrooklynCatalog;
 import org.apache.brooklyn.core.catalog.internal.CatalogInitialization;
 import org.apache.brooklyn.core.mgmt.ha.OsgiBundleInstallationResult;
 import org.apache.brooklyn.core.mgmt.ha.OsgiManager;
 import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestUtils;
+import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.test.support.TestResourceUnavailableException;
 import org.apache.brooklyn.util.core.osgi.Osgis;
 import org.apache.brooklyn.util.exceptions.ReferenceWithError;
@@ -427,6 +433,132 @@ public class BrooklynLauncherRebindCatalogOsgiTest extends AbstractBrooklynLaunc
         assertManagedBundle(newLauncher, COM_EXAMPLE_BUNDLE_ID, COM_EXAMPLE_BUNDLE_CATALOG_IDS);
     }
     
+    @Test
+    public void testPersistsSingleCopyOfInitialCatalog() throws Exception {
+        Set<VersionedName> bundleItems = ImmutableSet.of(VersionedName.fromString("one:1.0.0"));
+        String bundleBom = createCatalogYaml(ImmutableList.of(), bundleItems);
+        VersionedName bundleName = new VersionedName("org.example.testRebindGetsInitialOsgiCatalog"+Identifiers.makeRandomId(4), "1.0.0");
+        File bundleFile = newTmpBundle(ImmutableMap.of(BasicBrooklynCatalog.CATALOG_BOM, bundleBom.getBytes(StandardCharsets.UTF_8)), bundleName);
+        File initialBomFile = newTmpFile(createCatalogYaml(ImmutableList.of(bundleFile.toURI()), ImmutableList.of()));
+
+        // First launcher should persist the bundle
+        BrooklynLauncher launcher = newLauncherForTests(initialBomFile.getAbsolutePath());
+        launcher.start();
+        String bundlePersistenceId = findManagedBundle(launcher, bundleName).getId();
+        launcher.terminate();
+        assertEquals(getPersistenceListing(BrooklynObjectType.MANAGED_BUNDLE), ImmutableSet.of(bundlePersistenceId));
+
+        // Second launcher should read from initial catalog and persisted state. Both those bundles have different ids.
+        // Should only end up with one of them in persisted state.
+        // (Current impl is that it will be the "initial catalog" version, discarding the previously persisted bundle).
+        BrooklynLauncher launcher2 = newLauncherForTests(initialBomFile.getAbsolutePath());
+        launcher2.start();
+        String bundlePersistenceId2 = findManagedBundle(launcher2, bundleName).getId();
+        launcher2.terminate();
+        assertEquals(getPersistenceListing(BrooklynObjectType.MANAGED_BUNDLE), ImmutableSet.of(bundlePersistenceId2));
+    }
+
+    /**
+     * It is vital that the brooklyn-managed-bundle ids match those in persisted state. If they do not, 
+     * then deletion of a brooklyn-managed-bundle will not actually delete it from persisted state.
+     * 
+     * Under the covers, the scenario of HOT_STANDBY promoted to MASTER is very different from when 
+     * it starts as master:
+     * <ol>
+     *   <li> We become hot-standby; we call RebindManager.startReaOnly (so PeriodicDeltaChangeListener 
+     *        discards changes).
+     *   <li>We repeatedly call rebindManager.rebind():
+     *     <ol>
+     *       <li>Each time, we populate the catalog from the initial and persisted state
+     *       <li>The OsgiManager.ManagedBundleRecords is populated the first time; subsequent times we see it is
+     *           already a managed bundle so do nothing.
+     *       <li>The first time, it calls to the PeriodicDeltaChangeListener about the managed bundle, but the
+     *           change is discarded (because we are read-only, and PeriodicDeltaChangeListener is 'STOPPED').
+     *     </ol>
+     *   <li>On HighAvailabilityManagerImpl promoting us from HOT_STANDBY to MASTER:
+     *     <ol>
+     *       <li>Calls rebindManager.stopReadOnly; this resets the PeriodicDeltaChangeListener
+     *           (so that it's in the INIT state, ready for recording whatever happens while we're promoting to MASTER)
+     *       <li>Clears our cache of brooklyn-managed-bundles. This is important so that we record
+     *           (and thus update persistence) for the "initial catalog" bundles being managed.
+     *       <li>Calls rebindManager.rebind(); we are in a good state to do this, having cleared out whatever
+     *           was done previously while we were hot-standby.
+     *         <ol>
+     *           <li>The new ManagedBundle instances from the "initial catalog" are recorded in the 
+     *               PeriodicDeltaChangeListener.
+     *           <li>For persisted bundles, if they were duplicates of existing brooklyn-managed-bundles then they
+     *               are recorded as deleted (in PeriodicDeltaChangeListener).
+     *         </ol>
+     *       <li>HighAvailabilityManagerImpl.promoteToMaster() then calls rebindManager.start(), which switches us
+     *           into writable mode. The changes recorded in PeriodicDeltaChangeListener  are applied to the
+     *           persisted state.
+     *     </ol>
+     * </ol>
+     * 
+     * In contrast, when we start as MASTER:
+     * <ol>
+     *   <li>We call rebindManager.setPersister(); the PeriodicDeltaChangeListener is in 'INIT' state 
+     *       so will record any changes. 
+     *   <li>We call rebindManager.rebind(); it populates the catalog from the initial and persisted state; 
+     *     <ol>
+     *       <li>The new ManagedBundle instances from the "initial catalog" are recorded in the PeriodicDeltaChangeListener.
+     *       <li>For persisted bundles, if they were duplicates of existing brooklyn-managed-bundles then they
+     *           are recorded as deleted (in PeriodicDeltaChangeListener).
+     *     </ol>
+     *   <li>We call rebindManager.startPersistence(); this enables write-access to the persistence store,
+     *       and starts the PeriodicDeltaChangeListener (so all recorded changes are applied).
+     * </ol>
+     */
+    @Test
+    public void testPersistsSingleCopyOfInitialCatalogOnHotStandbyPromotion() throws Exception {
+        Set<VersionedName> bundleItems = ImmutableSet.of(VersionedName.fromString("one:1.0.0"));
+        String bundleBom = createCatalogYaml(ImmutableList.of(), bundleItems);
+        VersionedName bundleName = new VersionedName("org.example.testRebindGetsInitialOsgiCatalog"+Identifiers.makeRandomId(4), "1.0.0");
+        File bundleFile = newTmpBundle(ImmutableMap.of(BasicBrooklynCatalog.CATALOG_BOM, bundleBom.getBytes(StandardCharsets.UTF_8)), bundleName);
+        File initialBomFile = newTmpFile(createCatalogYaml(ImmutableList.of(bundleFile.toURI()), ImmutableList.of()));
+
+        // First launcher should persist the bundle
+        BrooklynLauncher launcher = newLauncherForTests(initialBomFile.getAbsolutePath())
+                .highAvailabilityMode(HighAvailabilityMode.MASTER);
+        launcher.start();
+        String bundlePersistenceId = findManagedBundle(launcher, bundleName).getId();
+        RebindTestUtils.waitForPersisted(launcher.getManagementContext());
+        assertEquals(getPersistenceListing(BrooklynObjectType.MANAGED_BUNDLE), ImmutableSet.of(bundlePersistenceId));
+
+        // Second launcher goes into hot-standby, and will thus rebind periodically.
+        // When we terminate the first launcher, it will be promoted to master automatically.
+        BrooklynLauncher launcher2 = newLauncherForTests(initialBomFile.getAbsolutePath())
+                .highAvailabilityMode(HighAvailabilityMode.HOT_STANDBY);
+        launcher2.start();
+        assertHotStandbyEventually(launcher2);
+        
+        launcher.terminate();
+        assertMasterEventually(launcher2);
+        String bundlePersistenceId2 = findManagedBundle(launcher2, bundleName).getId();
+        launcher2.terminate();
+        assertEquals(getPersistenceListing(BrooklynObjectType.MANAGED_BUNDLE), ImmutableSet.of(bundlePersistenceId2));
+    }
+
+    private void assertHotStandbyEventually(BrooklynLauncher launcher) {
+        ManagementContext mgmt = launcher.getManagementContext();
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertTrue(mgmt.isStartupComplete());
+                assertTrue(mgmt.isRunning());
+                assertEquals(mgmt.getHighAvailabilityManager().getNodeState(), ManagementNodeState.HOT_STANDBY);
+            }});
+    }
+    
+    private void assertMasterEventually(BrooklynLauncher launcher) {
+        ManagementContext mgmt = launcher.getManagementContext();
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertTrue(mgmt.isStartupComplete());
+                assertTrue(mgmt.isRunning());
+                assertEquals(mgmt.getHighAvailabilityManager().getNodeState(), ManagementNodeState.MASTER);
+            }});
+    }
+    
     private Bundle installBundle(Framework framework, File bundle) throws Exception {
         try (FileInputStream stream = new FileInputStream(bundle)) {
             return framework.getBundleContext().installBundle(bundle.toURI().toString(), stream);