You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/04 16:02:39 UTC

[1/4] git commit: Adds performance tests for BlobStore-backed persistence

Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master 1a9732e78 -> 73dcb1d8d


Adds performance tests for BlobStore-backed persistence


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/9350857a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/9350857a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/9350857a

Branch: refs/heads/master
Commit: 9350857a9f6ef8ff211831f25da55f7009d5c183
Parents: 1a9732e
Author: Aled Sage <al...@gmail.com>
Authored: Tue Jul 1 20:28:04 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:23:50 2014 +0100

----------------------------------------------------------------------
 .../entity/rebind/RebindTestFixture.java        |   6 +-
 .../brooklyn/entity/rebind/RebindTestUtils.java |  22 +++-
 .../qa/performance/AbstractPerformanceTest.java |   3 +
 .../FilePersistencePerformanceTest.java         |   4 +-
 .../BlobStorePersistencePerformanceTest.java    | 103 +++++++++++++++++++
 ...tyToBlobStorePersistencePerformanceTest.java |  47 +++++++++
 6 files changed, 179 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 3b1992d..4c8e8f7 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -38,11 +38,15 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
     @BeforeMethod(alwaysRun=true)
     public void setUp() throws Exception {
         mementoDir = Os.newTempDir(getClass());
-        origManagementContext = RebindTestUtils.newPersistingManagementContext(mementoDir, classLoader, getPersistPeriodMillis());
+        origManagementContext = createOrigManagementContext();
         origApp = createApp();
         
         LOG.info("Test "+getClass()+" persisting to "+mementoDir);
     }
+
+    protected LocalManagementContext createOrigManagementContext() {
+        return RebindTestUtils.newPersistingManagementContext(mementoDir, classLoader, getPersistPeriodMillis());
+    }
     
     protected int getPersistPeriodMillis() {
         return 1;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
index e1be542..4b4e54c 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
@@ -119,7 +119,8 @@ public class RebindTestUtils {
         BrooklynProperties properties;
         PersistenceObjectStore objectStore;
         Duration persistPeriod = Duration.millis(100);
-
+        boolean forLive;
+        
         ManagementContextBuilder(File mementoDir, ClassLoader classLoader) {
             this(classLoader, new FileBasedObjectStore(mementoDir));
         }
@@ -146,12 +147,25 @@ public class RebindTestUtils {
             return this;
         }
 
+        public ManagementContextBuilder forLive(boolean val) {
+            this.forLive = val;
+            return this;
+        }
+
         public LocalManagementContext buildUnstarted() {
             LocalManagementContext unstarted;
-            if (properties != null) {
-                unstarted = new LocalManagementContextForTests(properties);
+            if (forLive) {
+                if (properties != null) {
+                    unstarted = new LocalManagementContext(properties);
+                } else {
+                    unstarted = new LocalManagementContext();
+                }
             } else {
-                unstarted = new LocalManagementContextForTests();
+                if (properties != null) {
+                    unstarted = new LocalManagementContextForTests(properties);
+                } else {
+                    unstarted = new LocalManagementContextForTests();
+                }
             }
             
             objectStore.injectManagementContext(unstarted);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
index e585b7c..9e78084 100644
--- a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
@@ -12,6 +12,7 @@ import org.testng.annotations.BeforeMethod;
 import brooklyn.entity.basic.ApplicationBuilder;
 import brooklyn.entity.basic.Entities;
 import brooklyn.location.basic.SimulatedLocation;
+import brooklyn.management.ManagementContext;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.util.internal.DoubleSystemProperty;
 
@@ -48,12 +49,14 @@ public class AbstractPerformanceTest {
     
     protected TestApplication app;
     protected SimulatedLocation loc;
+    protected ManagementContext mgmt;
     
     @BeforeMethod(alwaysRun=true)
     public void setUp() throws Exception {
         for (int i = 0; i < 5; i++) System.gc();
         loc = new SimulatedLocation();
         app = ApplicationBuilder.newManagedApp(TestApplication.class);
+        mgmt = app.getManagementContext();
     }
     
     @AfterMethod(alwaysRun=true)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
index 65efdb1..58f1de0 100644
--- a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
@@ -5,6 +5,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -36,13 +37,14 @@ public class FilePersistencePerformanceTest extends AbstractPerformanceTest {
         app.start(ImmutableList.of(loc));
     }
 
+    @AfterMethod(alwaysRun=true)
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
         if (file != null) file.delete();
     }
     
-     protected int numIterations() {
+    protected int numIterations() {
         return 100;
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
new file mode 100644
index 0000000..8f7b479
--- /dev/null
+++ b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
@@ -0,0 +1,103 @@
+package brooklyn.entity.rebind.persister.jclouds;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.qa.performance.AbstractPerformanceTest;
+import brooklyn.util.text.Identifiers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class BlobStorePersistencePerformanceTest extends AbstractPerformanceTest {
+
+    public static final String LOCATION_SPEC = "named:brooklyn-jclouds-objstore-test-1";
+    
+    JcloudsBlobStoreBasedObjectStore objectStore;
+    StoreObjectAccessor blobstoreAccessor;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        objectStore = new JcloudsBlobStoreBasedObjectStore(LOCATION_SPEC, "BlobStorePersistencePerformanceTest");
+        objectStore.injectManagementContext(mgmt);
+        objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.AUTO);
+        blobstoreAccessor = objectStore.newAccessor(Identifiers.makeRandomId(8));
+        
+        app.start(ImmutableList.of(loc));
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (blobstoreAccessor != null) blobstoreAccessor.delete();
+        if (objectStore != null) {
+            objectStore.deleteCompletely();
+            objectStore.close();
+        }
+    }
+    
+    protected int numIterations() {
+        return 100;
+    }
+    
+     @Test(groups={"Live", "Acceptance"})
+     public void testStoreObjectPuts() throws Exception {
+         int numIterations = numIterations();
+         double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+         final AtomicInteger i = new AtomicInteger();
+         
+         measureAndAssert("StoreObjectAccessor.put", numIterations, minRatePerSec, new Runnable() {
+             public void run() {
+                 blobstoreAccessor.put(""+i.incrementAndGet());
+             }});
+     }
+ 
+     @Test(groups={"Live", "Acceptance"})
+     public void testStoreObjectGet() throws Exception {
+         // The file system will have done a lot of caching here - we are unlikely to touch the disk more than once.
+         int numIterations = numIterations();
+         double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+
+         measureAndAssert("FileBasedStoreObjectAccessor.get", numIterations, minRatePerSec, new Runnable() {
+             public void run() {
+                 blobstoreAccessor.get();
+             }});
+     }
+ 
+     @Test(groups={"Live", "Acceptance"})
+     public void testStoreObjectDelete() throws Exception {
+         int numIterations = numIterations();
+         double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+
+         // Will do 10% warm up runs first
+         final List<StoreObjectAccessor> blobstoreAccessors = Lists.newArrayList();
+         for (int i = 0; i < (numIterations * 1.1 + 1); i++) {
+             blobstoreAccessors.add(objectStore.newAccessor("storeObjectDelete-"+i));
+         }
+         
+         final AtomicInteger i = new AtomicInteger();
+
+         try {
+             measureAndAssert("FileBasedStoreObjectAccessor.delete", numIterations, minRatePerSec, new Runnable() {
+                 public void run() {
+                     StoreObjectAccessor blobstoreAccessor = blobstoreAccessors.get(i.getAndIncrement());
+                     blobstoreAccessor.delete();
+                 }});
+         } finally {
+             for (StoreObjectAccessor blobstoreAccessor : blobstoreAccessors) {
+                 blobstoreAccessor.delete();
+             }
+         }
+     }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
new file mode 100644
index 0000000..374e130
--- /dev/null
+++ b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
@@ -0,0 +1,47 @@
+package brooklyn.entity.rebind.persister.jclouds;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.rebind.RebindTestUtils;
+import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.qa.performance.EntityPersistencePerformanceTest;
+
+public class EntityToBlobStorePersistencePerformanceTest extends EntityPersistencePerformanceTest {
+
+    private static final String LOCATION_SPEC = BlobStorePersistencePerformanceTest.LOCATION_SPEC;
+    
+    private JcloudsBlobStoreBasedObjectStore objectStore;
+
+    @Override
+    protected LocalManagementContext createOrigManagementContext() {
+        objectStore = new JcloudsBlobStoreBasedObjectStore(LOCATION_SPEC, "EntityToBlobStorePersistencePerformanceTest");
+        
+        return RebindTestUtils.managementContextBuilder(classLoader, objectStore)
+                .forLive(true)
+                .persistPeriodMillis(getPersistPeriodMillis())
+                .buildStarted();
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (objectStore != null) {
+            objectStore.deleteCompletely();
+            objectStore.close();
+        }
+    }
+
+    @Test(groups="Live")
+    @Override
+    public void testManyEntities() throws Exception {
+        super.testManyEntities();
+    }
+    
+    @Test(groups="Live")
+    @Override
+    public void testRapidChanges() throws Exception {
+        super.testRapidChanges();
+    }
+}


[2/4] git commit: Use thread pool for Object Store persistence

Posted by he...@apache.org.
Use thread pool for Object Store persistence

- passes BrooklynProperties to BrooklynMementoPersisterToObjectStore
  so can read config like max thread pool size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d04761e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d04761e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d04761e3

Branch: refs/heads/master
Commit: d04761e350e86f828200d82cbe4901e4a1db9e9c
Parents: 9350857
Author: Aled Sage <al...@gmail.com>
Authored: Wed Jul 2 15:16:29 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:37:09 2014 +0100

----------------------------------------------------------------------
 .../rebind/PeriodicDeltaChangeListener.java     |   5 +
 .../entity/rebind/PersisterDeltaImpl.java       |  47 +-
 .../rebind/RebindExceptionHandlerImpl.java      |  23 +-
 .../entity/rebind/RebindManagerImpl.java        |   2 +-
 .../entity/rebind/dto/BrooklynMementoImpl.java  |  12 +-
 .../rebind/dto/BrooklynMementoManifestImpl.java |  16 +-
 .../BrooklynMementoPersisterToObjectStore.java  | 502 +++++++++++++------
 .../entity/rebind/RebindEntityTest.java         |   7 +
 .../entity/rebind/RebindTestFixture.java        |   6 +-
 .../brooklyn/entity/rebind/RebindTestUtils.java |  11 +-
 .../HighAvailabilityManagerSplitBrainTest.java  |   2 +-
 .../ha/HighAvailabilityManagerTestFixture.java  |   5 +-
 .../brooklyn/launcher/BrooklynLauncher.java     |   4 +-
 .../brooklyn/util/exceptions/Exceptions.java    |   2 +
 14 files changed, 458 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
index 9230d45..91ac9db 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
@@ -21,6 +21,7 @@ import brooklyn.mementos.BrooklynMementoPersister;
 import brooklyn.policy.Enricher;
 import brooklyn.policy.Policy;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeInterruptedException;
 import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.ScheduledTask;
 import brooklyn.util.time.Duration;
@@ -106,6 +107,10 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
                         try {
                             persistNow();
                             return null;
+                        } catch (RuntimeInterruptedException e) {
+                            LOG.debug("Interrupted persisting change-delta (rethrowing)", e);
+                            Thread.currentThread().interrupt();
+                            return null;
                         } catch (Exception e) {
                             // Don't rethrow: the behaviour of executionManager is different from a scheduledExecutorService,
                             // if we throw an exception, then our task will never get executed again

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
index 0950f81..1fdafec 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
@@ -10,7 +10,52 @@ import brooklyn.mementos.PolicyMemento;
 
 import com.google.common.collect.Sets;
 
-class PersisterDeltaImpl implements Delta {
+public class PersisterDeltaImpl implements Delta {
+    
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private final PersisterDeltaImpl delta = new PersisterDeltaImpl();
+
+        public Builder locations(Collection<? extends LocationMemento> vals) {
+            delta.locations.addAll(vals);
+            return this;
+        }
+        public Builder entities(Collection<? extends EntityMemento> vals) {
+            delta.entities.addAll(vals);
+            return this;
+        }
+        public Builder policies(Collection<? extends PolicyMemento> vals) {
+            delta.policies.addAll(vals);
+            return this;
+        }
+        public Builder enrichers(Collection<? extends EnricherMemento> vals) {
+            delta.enrichers.addAll(vals);
+            return this;
+        }
+        public Builder removedLocationIds(Collection<String> vals) {
+            delta.removedLocationIds.addAll(vals);
+            return this;
+        }
+        public Builder removedEntityIds(Collection<String> vals) {
+            delta.removedEntityIds.addAll(vals);
+            return this;
+        }
+        public Builder removedPolicyIds(Collection<String> vals) {
+            delta.removedPolicyIds.addAll(vals);
+            return this;
+        }
+        public Builder removedEnricherIds(Collection<String> vals) {
+            delta.removedEnricherIds.addAll(vals);
+            return this;
+        }
+        public Delta build() {
+            return delta;
+        }
+    }
+    
     Collection<LocationMemento> locations = Sets.newLinkedHashSet();
     Collection<EntityMemento> entities = Sets.newLinkedHashSet();
     Collection<PolicyMemento> policies = Sets.newLinkedHashSet();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
index 937d58d..eb4f8a3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
@@ -2,6 +2,7 @@ package brooklyn.entity.rebind;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -29,17 +30,17 @@ public class RebindExceptionHandlerImpl implements RebindExceptionHandler {
     protected final RebindFailureMode addPolicyFailureMode;
     protected final RebindFailureMode loadPolicyFailureMode;
 
-    protected final Set<String> missingEntities = Sets.newLinkedHashSet();
-    protected final Set<String> missingLocations = Sets.newLinkedHashSet();
-    protected final Set<String> missingPolicies = Sets.newLinkedHashSet();
-    protected final Set<String> missingEnrichers = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedEntities = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedLocations = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedPolicies = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedEnrichers = Sets.newLinkedHashSet();
-    protected final Set<Exception> addPolicyFailures = Sets.newLinkedHashSet();
-    protected final Set<Exception> loadPolicyFailures = Sets.newLinkedHashSet();
-    protected final List<Exception> exceptions = Lists.newArrayList();
+    protected final Set<String> missingEntities = Sets.newConcurrentHashSet();
+    protected final Set<String> missingLocations = Sets.newConcurrentHashSet();
+    protected final Set<String> missingPolicies = Sets.newConcurrentHashSet();
+    protected final Set<String> missingEnrichers = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedEntities = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedLocations = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedPolicies = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedEnrichers = Sets.newConcurrentHashSet();
+    protected final Set<Exception> addPolicyFailures = Sets.newConcurrentHashSet();
+    protected final Set<Exception> loadPolicyFailures = Sets.newConcurrentHashSet();
+    protected final List<Exception> exceptions = Collections.synchronizedList(Lists.<Exception>newArrayList());
     
     public static Builder builder() {
         return new Builder();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 5775dc4..1f2ae81 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -536,7 +536,7 @@ public class RebindManagerImpl implements RebindManager {
             T nodeinchain = node;
             while (nodeinchain != null) {
                 tempchain.add(0, nodeinchain);
-                nodeinchain = nodes.get(nodeinchain.getParent());
+                nodeinchain = (nodeinchain.getParent() == null) ? null : nodes.get(nodeinchain.getParent());
             }
             for (T n : tempchain) {
                 result.put(n.getId(), n);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
index b75a112..a04a75e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
@@ -27,12 +27,12 @@ public class BrooklynMementoImpl implements BrooklynMemento, Serializable {
     
     public static class Builder {
         protected String brooklynVersion = BrooklynVersion.get();
-        protected final List<String> applicationIds = Lists.newArrayList();
-        protected final List<String> topLevelLocationIds = Lists.newArrayList();
-        protected final Map<String, EntityMemento> entities = Maps.newLinkedHashMap();
-        protected final Map<String, LocationMemento> locations = Maps.newLinkedHashMap();
-        protected final Map<String, PolicyMemento> policies = Maps.newLinkedHashMap();
-        protected final Map<String, EnricherMemento> enrichers = Maps.newLinkedHashMap();
+        protected final List<String> applicationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+        protected final List<String> topLevelLocationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+        protected final Map<String, EntityMemento> entities = Maps.newConcurrentMap();
+        protected final Map<String, LocationMemento> locations = Maps.newConcurrentMap();
+        protected final Map<String, PolicyMemento> policies = Maps.newConcurrentMap();
+        protected final Map<String, EnricherMemento> enrichers = Maps.newConcurrentMap();
         
         public Builder brooklynVersion(String val) {
             brooklynVersion = val; return this;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
index e517f15..1ec617e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
@@ -18,10 +18,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
     
     public static class Builder {
         protected String brooklynVersion;
-        protected final Map<String, String> entityIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> locationIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> policyIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> enricherIdToType = Maps.newLinkedHashMap();
+        protected final Map<String, String> entityIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> locationIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> policyIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> enricherIdToType = Maps.newConcurrentMap();
         
         public Builder brooklynVersion(String val) {
             brooklynVersion = val; return this;
@@ -55,10 +55,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
         }
     }
 
-    private Map<String, String> entityIdToType;
-    private Map<String, String> locationIdToType;
-    private Map<String, String> policyIdToType;
-    private Map<String, String> enricherIdToType;
+    private final Map<String, String> entityIdToType;
+    private final Map<String, String> locationIdToType;
+    private final Map<String, String> policyIdToType;
+    private final Map<String, String> enricherIdToType;
     
     private BrooklynMementoManifestImpl(Builder builder) {
         entityIdToType = builder.entityIdToType;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8746cc0..8fcb3b3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,13 +6,23 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.rebind.PersistenceExceptionHandler;
+import brooklyn.entity.rebind.PersisterDeltaImpl;
 import brooklyn.entity.rebind.RebindExceptionHandler;
 import brooklyn.entity.rebind.dto.BrooklynMementoImpl;
 import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl;
@@ -26,6 +36,7 @@ import brooklyn.mementos.EntityMemento;
 import brooklyn.mementos.LocationMemento;
 import brooklyn.mementos.Memento;
 import brooklyn.mementos.PolicyMemento;
+import brooklyn.util.exceptions.CompoundRuntimeException;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
@@ -33,29 +44,57 @@ import brooklyn.util.xstream.XmlUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
  * {@link PersistenceObjectStore} such as a file system or a jclouds object store */
 public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister {
 
+    // TODO Crazy amount of duplication between handling entity, location, policy + enricher;
+    // Need to remove that duplication.
+
+    // TODO Should stop() take a timeout, and shutdown the executor gracefully?
+    
     private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class);
 
-    private static final int MAX_SERIALIZATION_ATTEMPTS = 5;
+    public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey(
+            "persister.threadpool.maxSize",
+            "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)", 
+            10);
+
+    public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
+            "persister.maxSerializationAttempts",
+            "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)", 
+            5);
 
     private final PersistenceObjectStore objectStore;
     private final MementoSerializer<Object> serializer;
 
     private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
 
+    private final ListeningExecutorService executor;
+
     private volatile boolean running = true;
 
+    /**
+     * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
+     * for any concurrent call to complete.
+     */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, ClassLoader classLoader) {
+    public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
         this.objectStore = checkNotNull(objectStore, "objectStore");
-        MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
-        this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS);
         
-        // TODO it's 95% the same code for each of these, throughout, so refactor to avoid repetition
+        int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
+        int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
+                
+        MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
+        this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
+
         objectStore.createSubPath("entities");
         objectStore.createSubPath("locations");
         objectStore.createSubPath("policies");
@@ -63,6 +102,11 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
 
         // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ?
         objectStore.createSubPath("plane");
+        
+        executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() {
+            @Override public Thread newThread(Runnable r) {
+                return new Thread(r, "brooklyn-persister");
+            }}));
     }
 
     public PersistenceObjectStore getObjectStore() {
@@ -72,6 +116,9 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     @Override
     public void stop() {
         running = false;
+        if (executor != null) {
+            executor.shutdownNow();
+        }
     }
     
     protected StoreObjectAccessorWithLock getWriter(String path) {
@@ -87,7 +134,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     }
 
     @Override
-    public BrooklynMementoManifest loadMementoManifest(RebindExceptionHandler exceptionHandler) throws IOException {
+    public BrooklynMementoManifest loadMementoManifest(final RebindExceptionHandler exceptionHandler) throws IOException {
         if (!running) {
             throw new IllegalStateException("Persister not running; cannot load memento manifest from " + objectStore.getSummaryName());
         }
@@ -110,67 +157,119 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         Stopwatch stopwatch = Stopwatch.createStarted();
 
         LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
-            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
-        BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
+        final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
 
-        for (String subPath : entitySubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/entity/id");
-                String type = (String) XmlUtil.xpath(contents, "/entity/type");
-                builder.entity(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
-            }
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        
+        for (final String subPath : entitySubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/entity/id");
+                        String type = (String) XmlUtil.xpath(contents, "/entity/type");
+                        builder.entity(id, type);
+                        LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
+                    } catch (Exception e) {
+                        LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : locationSubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/location/id");
-                String type = (String) XmlUtil.xpath(contents, "/location/type");
-                builder.location(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
-            }
+        for (final String subPath : locationSubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/location/id");
+                        String type = (String) XmlUtil.xpath(contents, "/location/type");
+                        builder.location(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : policySubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/policy/id");
-                String type = (String) XmlUtil.xpath(contents, "/policy/type");
-                builder.policy(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
-            }
+        for (final String subPath : policySubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/policy/id");
+                        String type = (String) XmlUtil.xpath(contents, "/policy/type");
+                        builder.policy(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : enricherSubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/enricher/id");
-                String type = (String) XmlUtil.xpath(contents, "/enricher/type");
-                builder.enricher(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+        for (final String subPath : enricherSubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/enricher/id");
+                        String type = (String) XmlUtil.xpath(contents, "/enricher/type");
+                        builder.enricher(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
+        }
+
+        try {
+            // Wait for all, failing fast if any exceptions.
+            Futures.allAsList(futures).get();
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            
+            List<Exception> exceptions = Lists.newArrayList();
+            
+            for (ListenableFuture<?> future : futures) {
+                if (future.isDone()) {
+                    try {
+                        future.get();
+                    } catch (InterruptedException e2) {
+                        throw Exceptions.propagate(e2);
+                    } catch (ExecutionException e2) {
+                        LOG.warn("Problem loading memento manifest", e2);
+                        exceptions.add(e2);
+                    }
+                    future.cancel(true);
+                }
             }
+            if (exceptions.isEmpty()) {
+                throw Exceptions.propagate(e);
+            } else {
+                // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+                throw new CompoundRuntimeException("Problem loading mementos", exceptions);
+            }
+        }
+
+        BrooklynMementoManifest result = builder.build();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+                     Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIdToType().size(), 
+                     result.getLocationIdToType().size(), result.getPolicyIdToType().size(), result.getEnricherIdToType().size(),
+                     objectStore.getSummaryName() });
+        }
+
+        if (result.getEntityIdToType().size() != entitySubPathList.size()) {
+            LOG.error("Lost an entity?!");
         }
         
-        if (LOG.isDebugEnabled())
-            LOG.debug("Loaded memento manifest; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
-        return builder.build();
+        return result;
     }
 
     @Override
-    public BrooklynMemento loadMemento(LookupContext lookupContext, RebindExceptionHandler exceptionHandler) throws IOException {
+    public BrooklynMemento loadMemento(LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException {
         if (!running) {
             throw new IllegalStateException("Persister not running; cannot load memento from " + objectStore.getSummaryName());
         }
@@ -195,71 +294,121 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
-        BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
+        final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
         serializer.setLookupContext(lookupContext);
+        
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        
         try {
-            for (String subPath : entitySubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EntityMemento memento = (EntityMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.entity(memento);
-                        if (memento.isTopLevelApp()) {
-                            builder.applicationId(memento.getId());
+            for (final String subPath : entitySubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            EntityMemento memento = (EntityMemento) serializer.fromString(read(subPath));
+                            if (memento == null) {
+                                LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.entity(memento);
+                                if (memento.isTopLevelApp()) {
+                                    builder.applicationId(memento.getId());
+                                }
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
                         }
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
-                }
+                    }}));
             }
-            for (String subPath : locationSubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    LocationMemento memento = (LocationMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.location(memento);
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
-                }
+            for (final String subPath : locationSubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            LocationMemento memento = (LocationMemento) serializer.fromString(read(subPath));
+                            if (memento == null) {
+                                LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.location(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
             }
-            for (String subPath : policySubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.policy(memento);
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
-                }
+            for (final String subPath : policySubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+                            PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
+                            if (memento == null) {
+                                LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.policy(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
             }
-            for (String subPath : enricherSubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.enricher(memento);
+            for (final String subPath : enricherSubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+                            EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
+                            if (memento == null) {
+                                LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.enricher(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
+            }
+            
+            try {
+                // Wait for all, failing fast if any exceptions.
+                Futures.allAsList(futures).get();
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                
+                List<Exception> exceptions = Lists.newArrayList();
+                
+                for (ListenableFuture<?> future : futures) {
+                    if (future.isDone()) {
+                        try {
+                            future.get();
+                        } catch (InterruptedException e2) {
+                            throw Exceptions.propagate(e2);
+                        } catch (ExecutionException e2) {
+                            LOG.warn("Problem loading memento", e2);
+                            exceptions.add(e2);
+                        }
+                        future.cancel(true);
                     }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                }
+                if (exceptions.isEmpty()) {
+                    throw Exceptions.propagate(e);
+                } else {
+                    // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+                    throw new CompoundRuntimeException("Problem loading mementos", exceptions);
                 }
             }
-            
+
         } finally {
             serializer.unsetLookupContext();
         }
 
-        if (LOG.isDebugEnabled()) LOG.debug("Loaded memento; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
-        return builder.build();
+        BrooklynMemento result = builder.build();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+                      Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(), 
+                      result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
+                      objectStore.getSummaryName() });
+        }
+        
+        return result;
     }
     
     @Override
@@ -268,67 +417,84 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointing entire memento, because not running");
             return;
         }
-        objectStore.prepareForMasterUse();
         
-        Stopwatch stopwatch = Stopwatch.createStarted();
-
-        for (EntityMemento entity : newMemento.getEntityMementos().values()) {
-            persist("entities", entity, exceptionHandler);
-        }
-        for (LocationMemento location : newMemento.getLocationMementos().values()) {
-            persist("locations", location, exceptionHandler);
-        }
-        for (PolicyMemento policy : newMemento.getPolicyMementos().values()) {
-            persist("policies", policy, exceptionHandler);
-        }
-        for (EnricherMemento enricher : newMemento.getEnricherMementos().values()) {
-            persist("enrichers", enricher, exceptionHandler);
-        }
+        Delta delta = PersisterDeltaImpl.builder()
+                .entities(newMemento.getEntityMementos().values())
+                .locations(newMemento.getLocationMementos().values())
+                .policies(newMemento.getPolicyMementos().values())
+                .enrichers(newMemento.getEnricherMementos().values())
+                .build();
+        Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
         
         if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
     }
-    
+
     @Override
     public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) {
         if (!running) {
             if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointed delta of memento, because not running");
             return;
         }
-        objectStore.prepareForMasterUse();
-        
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        
-        for (EntityMemento entity : delta.entities()) {
-            persist("entities", entity, exceptionHandler);
-        }
-        for (LocationMemento location : delta.locations()) {
-            persist("locations", location, exceptionHandler);
-        }
-        for (PolicyMemento policy : delta.policies()) {
-            persist("policies", policy, exceptionHandler);
-        }
-        for (EnricherMemento enricher : delta.enrichers()) {
-            persist("enrichers", enricher, exceptionHandler);
-        }
-        
-        for (String id : delta.removedEntityIds()) {
-            delete("entities", id, exceptionHandler);
-        }
-        for (String id : delta.removedLocationIds()) {
-            delete("locations", id, exceptionHandler);
-        }
-        for (String id : delta.removedPolicyIds()) {
-            delete("policies", id, exceptionHandler);
-        }
-        for (String id : delta.removedEnricherIds()) {
-            delete("enrichers", id, exceptionHandler);
-        }
+        Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
         
         if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of memento in {}; updated {} entities, {} locations and {} policies; " +
                 "removing {} entities, {} locations and {} policies", 
                 new Object[] {Time.makeTimeStringRounded(stopwatch), delta.entities(), delta.locations(), delta.policies(),
                 delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
     }
+    
+    private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
+        try {
+            lock.writeLock().lockInterruptibly();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        try {
+            objectStore.prepareForMasterUse();
+            
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            List<ListenableFuture<?>> futures = Lists.newArrayList();
+            
+            for (EntityMemento entity : delta.entities()) {
+                futures.add(asyncPersist("entities", entity, exceptionHandler));
+            }
+            for (LocationMemento location : delta.locations()) {
+                futures.add(asyncPersist("locations", location, exceptionHandler));
+            }
+            for (PolicyMemento policy : delta.policies()) {
+                futures.add(asyncPersist("policies", policy, exceptionHandler));
+            }
+            for (EnricherMemento enricher : delta.enrichers()) {
+                futures.add(asyncPersist("enrichers", enricher, exceptionHandler));
+            }
+            
+            for (String id : delta.removedEntityIds()) {
+                futures.add(asyncDelete("entities", id, exceptionHandler));
+            }
+            for (String id : delta.removedLocationIds()) {
+                futures.add(asyncDelete("locations", id, exceptionHandler));
+            }
+            for (String id : delta.removedPolicyIds()) {
+                futures.add(asyncDelete("policies", id, exceptionHandler));
+            }
+            for (String id : delta.removedEnricherIds()) {
+                futures.add(asyncDelete("enrichers", id, exceptionHandler));
+            }
+            
+            try {
+                // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
+                // But then propagate failure if any fail. (hence the two calls).
+                Futures.successfulAsList(futures).get();
+                Futures.allAsList(futures).get();
+            } catch (Exception e) {
+                throw Exceptions.propagate(e);
+            }
+            
+            return stopwatch;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
 
     @Override
     @VisibleForTesting
@@ -336,11 +502,20 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         waitForWritesCompleted(Duration.of(timeout, unit));
     }
     
+    @Override
     public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+        lock.writeLock().lockInterruptibly();
+        lock.writeLock().unlock();
+        
         for (StoreObjectAccessorWithLock writer : writers.values())
             writer.waitForCurrentWrites(timeout);
     }
 
+    private String read(String subPath) {
+        StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+        return objectAccessor.get();
+    }
+
     private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) {
         try {
             getWriter(getPath(subPath, memento.getId())).put(serializer.toString(memento));
@@ -348,7 +523,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             exceptionHandler.onPersistMementoFailed(memento, e);
         }
     }
-
+    
     private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) {
         try {
             StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
@@ -361,6 +536,27 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
+    private ListenableFuture<String> asyncRead(final String subPath) {
+        return executor.submit(new Callable<String>() {
+            public String call() {
+                return read(subPath);
+            }});
+    }
+
+    private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
+        return executor.submit(new Runnable() {
+            public void run() {
+                persist(subPath, memento, exceptionHandler);
+            }});
+    }
+
+    private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) {
+        return executor.submit(new Runnable() {
+            public void run() {
+                delete(subPath, id, exceptionHandler);
+            }});
+    }
+    
     private String getPath(String subPath, String id) {
         return subPath+"/"+id;
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
index 290cc3b..ea9f9f6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
@@ -175,6 +175,13 @@ public class RebindEntityTest extends RebindTestFixtureWithApp {
         assertEquals(newE.getDisplayName(), "mydisplayname");
     }
     
+    // Saw this fail during development (fixed now); but want at least one of these tests to be run 
+    // many times for stress testing purposes
+    @Test(invocationCount=100, groups="Integeration")
+    public void testRestoresEntityIdAndDisplayNameManyTimes() throws Exception {
+        testRestoresEntityIdAndDisplayName();
+    }
+    
     @Test
     public void testCanCustomizeRebind() throws Exception {
         MyEntity2 origE = origApp.createAndManageChild(EntitySpec.create(MyEntity2.class).configure("myfield", "myval"));

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 4c8e8f7..2fd873e 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -16,6 +16,7 @@ import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.management.ManagementContext;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.mementos.BrooklynMementoManifest;
 import brooklyn.test.entity.LocalManagementContextForTests;
 import brooklyn.util.os.Os;
@@ -111,7 +112,10 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
         FileBasedObjectStore objectStore = new FileBasedObjectStore(mementoDir);
         objectStore.injectManagementContext(newManagementContext);
         objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
-        BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+                objectStore,
+                ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+                classLoader);
         RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
         BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
         persister.stop();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
index 4b4e54c..d402bb9 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
@@ -27,6 +27,7 @@ import brooklyn.location.Location;
 import brooklyn.management.ManagementContext;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.test.entity.LocalManagementContextForTests;
 import brooklyn.util.javalang.Serializers;
@@ -170,7 +171,10 @@ public class RebindTestUtils {
             
             objectStore.injectManagementContext(unstarted);
             objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
-            BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+            BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+                    objectStore, 
+                    unstarted.getBrooklynProperties(), 
+                    classLoader);
             ((RebindManagerImpl) unstarted.getRebindManager()).setPeriodicPersistPeriod(persistPeriod);
             unstarted.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
             return unstarted;
@@ -235,7 +239,10 @@ public class RebindTestUtils {
     public static Collection<Application> rebindAll(ManagementContext newManagementContext, File mementoDir, ClassLoader classLoader, RebindExceptionHandler exceptionHandler, PersistenceObjectStore objectStore) throws Exception {
         LOG.info("Rebinding app, using directory "+mementoDir);
 
-        BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+                objectStore,
+                ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+                classLoader);
         newManagementContext.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
         List<Application> newApps;
         if (exceptionHandler == null) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
index e33ac4f..0183985 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -80,7 +80,7 @@ public class HighAvailabilityManagerSplitBrainTest {
             objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
             persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
             ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
-            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
             mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
             ha = new HighAvailabilityManagerImpl(mgmt)
                 .setPollPeriod(Duration.PRACTICALLY_FOREVER)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
index b572a80..6e89c37 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
@@ -68,7 +68,10 @@ public abstract class HighAvailabilityManagerTestFixture {
         objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
         persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader);
         ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
-        BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(
+                objectStore, 
+                managementContext.getBrooklynProperties(), 
+                classLoader);
         managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
         manager = new HighAvailabilityManagerImpl(managementContext)
                 .setPollPeriod(getPollPeriod())

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
----------------------------------------------------------------------
diff --git a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
index 64ef089..638c029 100644
--- a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
+++ b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
@@ -541,7 +541,9 @@ public class BrooklynLauncher {
 
             RebindManager rebindManager = managementContext.getRebindManager();
 
-            BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore,
+            BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+                    objectStore,
+                    ((ManagementContextInternal)managementContext).getBrooklynProperties(),
                     managementContext.getCatalog().getRootClassLoader());
             PersistenceExceptionHandler persistenceExceptionHandler = PersistenceExceptionHandlerImpl.builder().build();
             ((RebindManagerImpl) rebindManager).setPeriodicPersistPeriod(persistPeriod);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
index 5f0fc8a..42fddb2 100644
--- a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
@@ -80,6 +80,8 @@ public class Exceptions {
     public static void propagateIfFatal(Throwable throwable) {
         if (throwable instanceof InterruptedException)
             throw new RuntimeInterruptedException((InterruptedException) throwable);
+        if (throwable instanceof RuntimeInterruptedException)
+            throw (RuntimeInterruptedException) throwable;
         if (throwable instanceof Error)
             throw (Error) throwable;
     }


[4/4] git commit: This closes #41

Posted by he...@apache.org.
This closes #41


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/73dcb1d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/73dcb1d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/73dcb1d8

Branch: refs/heads/master
Commit: 73dcb1d8d45647d365c7144b8f886b9b45500aa2
Parents: 1a9732e 8191d93
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Jul 4 14:56:50 2014 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Jul 4 14:56:50 2014 +0100

----------------------------------------------------------------------
 .../mementos/BrooklynMementoPersister.java      |   2 +-
 .../rebind/PeriodicDeltaChangeListener.java     |   5 +
 .../entity/rebind/PersisterDeltaImpl.java       |  47 +-
 .../rebind/RebindExceptionHandlerImpl.java      |  23 +-
 .../entity/rebind/RebindManagerImpl.java        |   4 +-
 .../entity/rebind/dto/BrooklynMementoImpl.java  |  12 +-
 .../rebind/dto/BrooklynMementoManifestImpl.java |  16 +-
 .../AbstractBrooklynMementoPersister.java       |   2 +-
 .../BrooklynMementoPersisterToMultiFile.java    |  16 +-
 .../BrooklynMementoPersisterToObjectStore.java  | 529 +++++++++++++------
 .../persister/StoreObjectAccessorLocking.java   |   9 +-
 .../entity/rebind/RebindEntityTest.java         |   7 +
 .../entity/rebind/RebindTestFixture.java        |  14 +-
 .../brooklyn/entity/rebind/RebindTestUtils.java |  33 +-
 .../BrooklynMementoPersisterTestFixture.java    |   2 +-
 .../HighAvailabilityManagerSplitBrainTest.java  |   2 +-
 .../ha/HighAvailabilityManagerTestFixture.java  |   5 +-
 .../qa/performance/AbstractPerformanceTest.java |   3 +
 .../FilePersistencePerformanceTest.java         |   4 +-
 .../BlobStorePersistencePerformanceTest.java    | 103 ++++
 ...tyToBlobStorePersistencePerformanceTest.java |  47 ++
 .../brooklyn/launcher/BrooklynLauncher.java     |   4 +-
 .../brooklyn/util/exceptions/Exceptions.java    |   2 +
 23 files changed, 680 insertions(+), 211 deletions(-)
----------------------------------------------------------------------



[3/4] git commit: Incorporate comments for PR #41: thread pool for ObjectStore

Posted by he...@apache.org.
Incorporate comments for PR #41: thread pool for ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/8191d934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/8191d934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/8191d934

Branch: refs/heads/master
Commit: 8191d934c873681ed28950debd17dd1edeae2417
Parents: d04761e
Author: Aled Sage <al...@gmail.com>
Authored: Fri Jul 4 14:23:42 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:38:12 2014 +0100

----------------------------------------------------------------------
 .../mementos/BrooklynMementoPersister.java      |  2 +-
 .../entity/rebind/RebindManagerImpl.java        |  2 +-
 .../AbstractBrooklynMementoPersister.java       |  2 +-
 .../BrooklynMementoPersisterToMultiFile.java    | 16 +++--
 .../BrooklynMementoPersisterToObjectStore.java  | 61 ++++++++++++--------
 .../persister/StoreObjectAccessorLocking.java   |  9 ++-
 .../entity/rebind/RebindTestFixture.java        |  2 +-
 .../BrooklynMementoPersisterTestFixture.java    |  2 +-
 8 files changed, 60 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
index 016c10f..579751d 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
@@ -42,7 +42,7 @@ public interface BrooklynMementoPersister {
 
     void delta(Delta delta, PersistenceExceptionHandler exceptionHandler);
 
-    void stop();
+    void stop(boolean graceful);
 
     @VisibleForTesting
     void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 1f2ae81..af25906 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -192,7 +192,7 @@ public class RebindManagerImpl implements RebindManager {
     public void stop() {
         running = false;
         if (realChangeListener != null) realChangeListener.stop();
-        if (persister != null) persister.stop();
+        if (persister != null) persister.stop(true);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
index 04e69e7..a8b5f52 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
@@ -47,7 +47,7 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement
     }
     
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         // no-op
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index c2bbdd0..06791bc 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -102,13 +102,17 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
     }
     
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         running = false;
-        executor.shutdown();
-        try {
-            executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
+        if (graceful) {
+            executor.shutdown();
+            try {
+                executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+        } else {
+            executor.shutdownNow();
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8fcb3b3..d036648 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,7 +6,6 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory;
 import brooklyn.config.BrooklynProperties;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.rebind.PeriodicDeltaChangeListener;
 import brooklyn.entity.rebind.PersistenceExceptionHandler;
 import brooklyn.entity.rebind.PersisterDeltaImpl;
 import brooklyn.entity.rebind.RebindExceptionHandler;
@@ -84,7 +84,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
      * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
      * for any concurrent call to complete.
      */
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
     public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
         this.objectStore = checkNotNull(objectStore, "objectStore");
@@ -109,18 +109,29 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             }}));
     }
 
-    public PersistenceObjectStore getObjectStore() {
-        return objectStore;
-    }
-
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         running = false;
         if (executor != null) {
-            executor.shutdownNow();
+            if (graceful) {
+                // a very long timeout to ensure we don't lose state. 
+                // If persisting thousands of entities over slow network to Object Store, could take minutes.
+                executor.shutdown();
+                try {
+                    executor.awaitTermination(1, TimeUnit.HOURS);
+                } catch (InterruptedException e) {
+                    throw Exceptions.propagate(e);
+                }
+            } else {
+                executor.shutdownNow();
+            }
         }
     }
     
+    public PersistenceObjectStore getObjectStore() {
+        return objectStore;
+    }
+
     protected StoreObjectAccessorWithLock getWriter(String path) {
         String id = path.substring(path.lastIndexOf('/')+1);
         synchronized (writers) {
@@ -157,7 +168,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         Stopwatch stopwatch = Stopwatch.createStarted();
 
         LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
-            entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
         final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
@@ -172,9 +183,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                         String id = (String) XmlUtil.xpath(contents, "/entity/id");
                         String type = (String) XmlUtil.xpath(contents, "/entity/type");
                         builder.entity(id, type);
-                        LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
                     } catch (Exception e) {
-                        LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
                         Exceptions.propagateIfFatal(e);
                         exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
                     }
@@ -443,6 +452,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                 delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
     }
     
+    /**
+     * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
+     * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
+     * 
+     * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
+     * don't do unnecessary repeated writes of an entity.
+     */
     private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
         try {
             lock.writeLock().lockInterruptibly();
@@ -504,11 +520,17 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     
     @Override
     public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
-        lock.writeLock().lockInterruptibly();
-        lock.writeLock().unlock();
-        
-        for (StoreObjectAccessorWithLock writer : writers.values())
-            writer.waitForCurrentWrites(timeout);
+        boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+        if (locked) {
+            lock.readLock().unlock();
+            
+            // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
+            // each writer is now synchronous.
+            for (StoreObjectAccessorWithLock writer : writers.values())
+                writer.waitForCurrentWrites(timeout);
+        } else {
+            throw new TimeoutException("Timeout waiting for writes to "+objectStore);
+        }
     }
 
     private String read(String subPath) {
@@ -536,13 +558,6 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
-    private ListenableFuture<String> asyncRead(final String subPath) {
-        return executor.submit(new Callable<String>() {
-            public String call() {
-                return read(subPath);
-            }});
-    }
-
     private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
         return executor.submit(new Runnable() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
index 1ec2afd..c43df3b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
@@ -4,6 +4,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -152,8 +153,12 @@ public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreO
     @Override
     public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
         try {
-            lock.readLock().lockInterruptibly();
-            lock.readLock().unlock();
+            boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+            if (locked) {
+                lock.readLock().unlock();
+            } else {
+                throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout);
+            }
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 2fd873e..9e233d6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -118,7 +118,7 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
                 classLoader);
         RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
         BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
-        persister.stop();
+        persister.stop(false);
         return mementoManifest;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
index 1946f75..73e1443 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
@@ -76,7 +76,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
     public void tearDown() throws Exception {
         if (localManagementContext != null) Entities.destroyAll(localManagementContext);
         if (app != null) Entities.destroyAll(app.getManagementContext());
-        if (persister != null) persister.stop();
+        if (persister != null) persister.stop(false);
         if (objectStore!=null) objectStore.deleteCompletely();
         persister = null;
     }