You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/04 16:02:39 UTC
[1/4] git commit: Adds performance tests for BlobStore-backed
persistence
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master 1a9732e78 -> 73dcb1d8d
Adds performance tests for BlobStore-backed persistence
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/9350857a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/9350857a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/9350857a
Branch: refs/heads/master
Commit: 9350857a9f6ef8ff211831f25da55f7009d5c183
Parents: 1a9732e
Author: Aled Sage <al...@gmail.com>
Authored: Tue Jul 1 20:28:04 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:23:50 2014 +0100
----------------------------------------------------------------------
.../entity/rebind/RebindTestFixture.java | 6 +-
.../brooklyn/entity/rebind/RebindTestUtils.java | 22 +++-
.../qa/performance/AbstractPerformanceTest.java | 3 +
.../FilePersistencePerformanceTest.java | 4 +-
.../BlobStorePersistencePerformanceTest.java | 103 +++++++++++++++++++
...tyToBlobStorePersistencePerformanceTest.java | 47 +++++++++
6 files changed, 179 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 3b1992d..4c8e8f7 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -38,11 +38,15 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
@BeforeMethod(alwaysRun=true)
public void setUp() throws Exception {
mementoDir = Os.newTempDir(getClass());
- origManagementContext = RebindTestUtils.newPersistingManagementContext(mementoDir, classLoader, getPersistPeriodMillis());
+ origManagementContext = createOrigManagementContext();
origApp = createApp();
LOG.info("Test "+getClass()+" persisting to "+mementoDir);
}
+
+ protected LocalManagementContext createOrigManagementContext() {
+ return RebindTestUtils.newPersistingManagementContext(mementoDir, classLoader, getPersistPeriodMillis());
+ }
protected int getPersistPeriodMillis() {
return 1;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
index e1be542..4b4e54c 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
@@ -119,7 +119,8 @@ public class RebindTestUtils {
BrooklynProperties properties;
PersistenceObjectStore objectStore;
Duration persistPeriod = Duration.millis(100);
-
+ boolean forLive;
+
ManagementContextBuilder(File mementoDir, ClassLoader classLoader) {
this(classLoader, new FileBasedObjectStore(mementoDir));
}
@@ -146,12 +147,25 @@ public class RebindTestUtils {
return this;
}
+ public ManagementContextBuilder forLive(boolean val) {
+ this.forLive = val;
+ return this;
+ }
+
public LocalManagementContext buildUnstarted() {
LocalManagementContext unstarted;
- if (properties != null) {
- unstarted = new LocalManagementContextForTests(properties);
+ if (forLive) {
+ if (properties != null) {
+ unstarted = new LocalManagementContext(properties);
+ } else {
+ unstarted = new LocalManagementContext();
+ }
} else {
- unstarted = new LocalManagementContextForTests();
+ if (properties != null) {
+ unstarted = new LocalManagementContextForTests(properties);
+ } else {
+ unstarted = new LocalManagementContextForTests();
+ }
}
objectStore.injectManagementContext(unstarted);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
index e585b7c..9e78084 100644
--- a/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/AbstractPerformanceTest.java
@@ -12,6 +12,7 @@ import org.testng.annotations.BeforeMethod;
import brooklyn.entity.basic.ApplicationBuilder;
import brooklyn.entity.basic.Entities;
import brooklyn.location.basic.SimulatedLocation;
+import brooklyn.management.ManagementContext;
import brooklyn.test.entity.TestApplication;
import brooklyn.util.internal.DoubleSystemProperty;
@@ -48,12 +49,14 @@ public class AbstractPerformanceTest {
protected TestApplication app;
protected SimulatedLocation loc;
+ protected ManagementContext mgmt;
@BeforeMethod(alwaysRun=true)
public void setUp() throws Exception {
for (int i = 0; i < 5; i++) System.gc();
loc = new SimulatedLocation();
app = ApplicationBuilder.newManagedApp(TestApplication.class);
+ mgmt = app.getManagementContext();
}
@AfterMethod(alwaysRun=true)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
index 65efdb1..58f1de0 100644
--- a/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
+++ b/core/src/test/java/brooklyn/qa/performance/FilePersistencePerformanceTest.java
@@ -5,6 +5,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -36,13 +37,14 @@ public class FilePersistencePerformanceTest extends AbstractPerformanceTest {
app.start(ImmutableList.of(loc));
}
+ @AfterMethod(alwaysRun=true)
@Override
public void tearDown() throws Exception {
super.tearDown();
if (file != null) file.delete();
}
- protected int numIterations() {
+ protected int numIterations() {
return 100;
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
new file mode 100644
index 0000000..8f7b479
--- /dev/null
+++ b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BlobStorePersistencePerformanceTest.java
@@ -0,0 +1,103 @@
+package brooklyn.entity.rebind.persister.jclouds;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.qa.performance.AbstractPerformanceTest;
+import brooklyn.util.text.Identifiers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class BlobStorePersistencePerformanceTest extends AbstractPerformanceTest {
+
+ public static final String LOCATION_SPEC = "named:brooklyn-jclouds-objstore-test-1";
+
+ JcloudsBlobStoreBasedObjectStore objectStore;
+ StoreObjectAccessor blobstoreAccessor;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ objectStore = new JcloudsBlobStoreBasedObjectStore(LOCATION_SPEC, "BlobStorePersistencePerformanceTest");
+ objectStore.injectManagementContext(mgmt);
+ objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.AUTO);
+ blobstoreAccessor = objectStore.newAccessor(Identifiers.makeRandomId(8));
+
+ app.start(ImmutableList.of(loc));
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (blobstoreAccessor != null) blobstoreAccessor.delete();
+ if (objectStore != null) {
+ objectStore.deleteCompletely();
+ objectStore.close();
+ }
+ }
+
+ protected int numIterations() {
+ return 100;
+ }
+
+ @Test(groups={"Live", "Acceptance"})
+ public void testStoreObjectPuts() throws Exception {
+ int numIterations = numIterations();
+ double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+ final AtomicInteger i = new AtomicInteger();
+
+ measureAndAssert("StoreObjectAccessor.put", numIterations, minRatePerSec, new Runnable() {
+ public void run() {
+ blobstoreAccessor.put(""+i.incrementAndGet());
+ }});
+ }
+
+ @Test(groups={"Live", "Acceptance"})
+ public void testStoreObjectGet() throws Exception {
+ // The file system will have done a lot of caching here - we are unlikely to touch the disk more than once.
+ int numIterations = numIterations();
+ double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+
+ measureAndAssert("FileBasedStoreObjectAccessor.get", numIterations, minRatePerSec, new Runnable() {
+ public void run() {
+ blobstoreAccessor.get();
+ }});
+ }
+
+ @Test(groups={"Live", "Acceptance"})
+ public void testStoreObjectDelete() throws Exception {
+ int numIterations = numIterations();
+ double minRatePerSec = 10 * PERFORMANCE_EXPECTATION;
+
+ // Will do 10% warm up runs first
+ final List<StoreObjectAccessor> blobstoreAccessors = Lists.newArrayList();
+ for (int i = 0; i < (numIterations * 1.1 + 1); i++) {
+ blobstoreAccessors.add(objectStore.newAccessor("storeObjectDelete-"+i));
+ }
+
+ final AtomicInteger i = new AtomicInteger();
+
+ try {
+ measureAndAssert("FileBasedStoreObjectAccessor.delete", numIterations, minRatePerSec, new Runnable() {
+ public void run() {
+ StoreObjectAccessor blobstoreAccessor = blobstoreAccessors.get(i.getAndIncrement());
+ blobstoreAccessor.delete();
+ }});
+ } finally {
+ for (StoreObjectAccessor blobstoreAccessor : blobstoreAccessors) {
+ blobstoreAccessor.delete();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/9350857a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
new file mode 100644
index 0000000..374e130
--- /dev/null
+++ b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/EntityToBlobStorePersistencePerformanceTest.java
@@ -0,0 +1,47 @@
+package brooklyn.entity.rebind.persister.jclouds;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.rebind.RebindTestUtils;
+import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.qa.performance.EntityPersistencePerformanceTest;
+
+public class EntityToBlobStorePersistencePerformanceTest extends EntityPersistencePerformanceTest {
+
+ private static final String LOCATION_SPEC = BlobStorePersistencePerformanceTest.LOCATION_SPEC;
+
+ private JcloudsBlobStoreBasedObjectStore objectStore;
+
+ @Override
+ protected LocalManagementContext createOrigManagementContext() {
+ objectStore = new JcloudsBlobStoreBasedObjectStore(LOCATION_SPEC, "EntityToBlobStorePersistencePerformanceTest");
+
+ return RebindTestUtils.managementContextBuilder(classLoader, objectStore)
+ .forLive(true)
+ .persistPeriodMillis(getPersistPeriodMillis())
+ .buildStarted();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (objectStore != null) {
+ objectStore.deleteCompletely();
+ objectStore.close();
+ }
+ }
+
+ @Test(groups="Live")
+ @Override
+ public void testManyEntities() throws Exception {
+ super.testManyEntities();
+ }
+
+ @Test(groups="Live")
+ @Override
+ public void testRapidChanges() throws Exception {
+ super.testRapidChanges();
+ }
+}
[2/4] git commit: Use thread pool for Object Store persistence
Posted by he...@apache.org.
Use thread pool for Object Store persistence
- passes BrooklynProperties to BrooklynMementoPersisterToObjectStore
so can read config like max thread pool size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d04761e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d04761e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d04761e3
Branch: refs/heads/master
Commit: d04761e350e86f828200d82cbe4901e4a1db9e9c
Parents: 9350857
Author: Aled Sage <al...@gmail.com>
Authored: Wed Jul 2 15:16:29 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:37:09 2014 +0100
----------------------------------------------------------------------
.../rebind/PeriodicDeltaChangeListener.java | 5 +
.../entity/rebind/PersisterDeltaImpl.java | 47 +-
.../rebind/RebindExceptionHandlerImpl.java | 23 +-
.../entity/rebind/RebindManagerImpl.java | 2 +-
.../entity/rebind/dto/BrooklynMementoImpl.java | 12 +-
.../rebind/dto/BrooklynMementoManifestImpl.java | 16 +-
.../BrooklynMementoPersisterToObjectStore.java | 502 +++++++++++++------
.../entity/rebind/RebindEntityTest.java | 7 +
.../entity/rebind/RebindTestFixture.java | 6 +-
.../brooklyn/entity/rebind/RebindTestUtils.java | 11 +-
.../HighAvailabilityManagerSplitBrainTest.java | 2 +-
.../ha/HighAvailabilityManagerTestFixture.java | 5 +-
.../brooklyn/launcher/BrooklynLauncher.java | 4 +-
.../brooklyn/util/exceptions/Exceptions.java | 2 +
14 files changed, 458 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
index 9230d45..91ac9db 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
@@ -21,6 +21,7 @@ import brooklyn.mementos.BrooklynMementoPersister;
import brooklyn.policy.Enricher;
import brooklyn.policy.Policy;
import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeInterruptedException;
import brooklyn.util.task.BasicTask;
import brooklyn.util.task.ScheduledTask;
import brooklyn.util.time.Duration;
@@ -106,6 +107,10 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
try {
persistNow();
return null;
+ } catch (RuntimeInterruptedException e) {
+ LOG.debug("Interrupted persisting change-delta (rethrowing)", e);
+ Thread.currentThread().interrupt();
+ return null;
} catch (Exception e) {
// Don't rethrow: the behaviour of executionManager is different from a scheduledExecutorService,
// if we throw an exception, then our task will never get executed again
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
index 0950f81..1fdafec 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
@@ -10,7 +10,52 @@ import brooklyn.mementos.PolicyMemento;
import com.google.common.collect.Sets;
-class PersisterDeltaImpl implements Delta {
+public class PersisterDeltaImpl implements Delta {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final PersisterDeltaImpl delta = new PersisterDeltaImpl();
+
+ public Builder locations(Collection<? extends LocationMemento> vals) {
+ delta.locations.addAll(vals);
+ return this;
+ }
+ public Builder entities(Collection<? extends EntityMemento> vals) {
+ delta.entities.addAll(vals);
+ return this;
+ }
+ public Builder policies(Collection<? extends PolicyMemento> vals) {
+ delta.policies.addAll(vals);
+ return this;
+ }
+ public Builder enrichers(Collection<? extends EnricherMemento> vals) {
+ delta.enrichers.addAll(vals);
+ return this;
+ }
+ public Builder removedLocationIds(Collection<String> vals) {
+ delta.removedLocationIds.addAll(vals);
+ return this;
+ }
+ public Builder removedEntityIds(Collection<String> vals) {
+ delta.removedEntityIds.addAll(vals);
+ return this;
+ }
+ public Builder removedPolicyIds(Collection<String> vals) {
+ delta.removedPolicyIds.addAll(vals);
+ return this;
+ }
+ public Builder removedEnricherIds(Collection<String> vals) {
+ delta.removedEnricherIds.addAll(vals);
+ return this;
+ }
+ public Delta build() {
+ return delta;
+ }
+ }
+
Collection<LocationMemento> locations = Sets.newLinkedHashSet();
Collection<EntityMemento> entities = Sets.newLinkedHashSet();
Collection<PolicyMemento> policies = Sets.newLinkedHashSet();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
index 937d58d..eb4f8a3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
@@ -2,6 +2,7 @@ package brooklyn.entity.rebind;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -29,17 +30,17 @@ public class RebindExceptionHandlerImpl implements RebindExceptionHandler {
protected final RebindFailureMode addPolicyFailureMode;
protected final RebindFailureMode loadPolicyFailureMode;
- protected final Set<String> missingEntities = Sets.newLinkedHashSet();
- protected final Set<String> missingLocations = Sets.newLinkedHashSet();
- protected final Set<String> missingPolicies = Sets.newLinkedHashSet();
- protected final Set<String> missingEnrichers = Sets.newLinkedHashSet();
- protected final Set<String> creationFailedEntities = Sets.newLinkedHashSet();
- protected final Set<String> creationFailedLocations = Sets.newLinkedHashSet();
- protected final Set<String> creationFailedPolicies = Sets.newLinkedHashSet();
- protected final Set<String> creationFailedEnrichers = Sets.newLinkedHashSet();
- protected final Set<Exception> addPolicyFailures = Sets.newLinkedHashSet();
- protected final Set<Exception> loadPolicyFailures = Sets.newLinkedHashSet();
- protected final List<Exception> exceptions = Lists.newArrayList();
+ protected final Set<String> missingEntities = Sets.newConcurrentHashSet();
+ protected final Set<String> missingLocations = Sets.newConcurrentHashSet();
+ protected final Set<String> missingPolicies = Sets.newConcurrentHashSet();
+ protected final Set<String> missingEnrichers = Sets.newConcurrentHashSet();
+ protected final Set<String> creationFailedEntities = Sets.newConcurrentHashSet();
+ protected final Set<String> creationFailedLocations = Sets.newConcurrentHashSet();
+ protected final Set<String> creationFailedPolicies = Sets.newConcurrentHashSet();
+ protected final Set<String> creationFailedEnrichers = Sets.newConcurrentHashSet();
+ protected final Set<Exception> addPolicyFailures = Sets.newConcurrentHashSet();
+ protected final Set<Exception> loadPolicyFailures = Sets.newConcurrentHashSet();
+ protected final List<Exception> exceptions = Collections.synchronizedList(Lists.<Exception>newArrayList());
public static Builder builder() {
return new Builder();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 5775dc4..1f2ae81 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -536,7 +536,7 @@ public class RebindManagerImpl implements RebindManager {
T nodeinchain = node;
while (nodeinchain != null) {
tempchain.add(0, nodeinchain);
- nodeinchain = nodes.get(nodeinchain.getParent());
+ nodeinchain = (nodeinchain.getParent() == null) ? null : nodes.get(nodeinchain.getParent());
}
for (T n : tempchain) {
result.put(n.getId(), n);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
index b75a112..a04a75e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
@@ -27,12 +27,12 @@ public class BrooklynMementoImpl implements BrooklynMemento, Serializable {
public static class Builder {
protected String brooklynVersion = BrooklynVersion.get();
- protected final List<String> applicationIds = Lists.newArrayList();
- protected final List<String> topLevelLocationIds = Lists.newArrayList();
- protected final Map<String, EntityMemento> entities = Maps.newLinkedHashMap();
- protected final Map<String, LocationMemento> locations = Maps.newLinkedHashMap();
- protected final Map<String, PolicyMemento> policies = Maps.newLinkedHashMap();
- protected final Map<String, EnricherMemento> enrichers = Maps.newLinkedHashMap();
+ protected final List<String> applicationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+ protected final List<String> topLevelLocationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+ protected final Map<String, EntityMemento> entities = Maps.newConcurrentMap();
+ protected final Map<String, LocationMemento> locations = Maps.newConcurrentMap();
+ protected final Map<String, PolicyMemento> policies = Maps.newConcurrentMap();
+ protected final Map<String, EnricherMemento> enrichers = Maps.newConcurrentMap();
public Builder brooklynVersion(String val) {
brooklynVersion = val; return this;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
index e517f15..1ec617e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
@@ -18,10 +18,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
public static class Builder {
protected String brooklynVersion;
- protected final Map<String, String> entityIdToType = Maps.newLinkedHashMap();
- protected final Map<String, String> locationIdToType = Maps.newLinkedHashMap();
- protected final Map<String, String> policyIdToType = Maps.newLinkedHashMap();
- protected final Map<String, String> enricherIdToType = Maps.newLinkedHashMap();
+ protected final Map<String, String> entityIdToType = Maps.newConcurrentMap();
+ protected final Map<String, String> locationIdToType = Maps.newConcurrentMap();
+ protected final Map<String, String> policyIdToType = Maps.newConcurrentMap();
+ protected final Map<String, String> enricherIdToType = Maps.newConcurrentMap();
public Builder brooklynVersion(String val) {
brooklynVersion = val; return this;
@@ -55,10 +55,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
}
}
- private Map<String, String> entityIdToType;
- private Map<String, String> locationIdToType;
- private Map<String, String> policyIdToType;
- private Map<String, String> enricherIdToType;
+ private final Map<String, String> entityIdToType;
+ private final Map<String, String> locationIdToType;
+ private final Map<String, String> policyIdToType;
+ private final Map<String, String> enricherIdToType;
private BrooklynMementoManifestImpl(Builder builder) {
entityIdToType = builder.entityIdToType;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8746cc0..8fcb3b3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,13 +6,23 @@ import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
import brooklyn.entity.rebind.PersistenceExceptionHandler;
+import brooklyn.entity.rebind.PersisterDeltaImpl;
import brooklyn.entity.rebind.RebindExceptionHandler;
import brooklyn.entity.rebind.dto.BrooklynMementoImpl;
import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl;
@@ -26,6 +36,7 @@ import brooklyn.mementos.EntityMemento;
import brooklyn.mementos.LocationMemento;
import brooklyn.mementos.Memento;
import brooklyn.mementos.PolicyMemento;
+import brooklyn.util.exceptions.CompoundRuntimeException;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.time.Duration;
import brooklyn.util.time.Time;
@@ -33,29 +44,57 @@ import brooklyn.util.xstream.XmlUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
* {@link PersistenceObjectStore} such as a file system or a jclouds object store */
public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister {
+ // TODO Crazy amount of duplication between handling entity, location, policy + enricher;
+ // Need to remove that duplication.
+
+ // TODO Should stop() take a timeout, and shutdown the executor gracefully?
+
private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class);
- private static final int MAX_SERIALIZATION_ATTEMPTS = 5;
+ public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey(
+ "persister.threadpool.maxSize",
+ "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)",
+ 10);
+
+ public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
+ "persister.maxSerializationAttempts",
+ "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)",
+ 5);
private final PersistenceObjectStore objectStore;
private final MementoSerializer<Object> serializer;
private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
+ private final ListeningExecutorService executor;
+
private volatile boolean running = true;
+ /**
+ * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
+ * for any concurrent call to complete.
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
- public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, ClassLoader classLoader) {
+ public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
this.objectStore = checkNotNull(objectStore, "objectStore");
- MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
- this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS);
- // TODO it's 95% the same code for each of these, throughout, so refactor to avoid repetition
+ int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
+ int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
+
+ MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
+ this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
+
objectStore.createSubPath("entities");
objectStore.createSubPath("locations");
objectStore.createSubPath("policies");
@@ -63,6 +102,11 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
// FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ?
objectStore.createSubPath("plane");
+
+ executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() {
+ @Override public Thread newThread(Runnable r) {
+ return new Thread(r, "brooklyn-persister");
+ }}));
}
public PersistenceObjectStore getObjectStore() {
@@ -72,6 +116,9 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
@Override
public void stop() {
running = false;
+ if (executor != null) {
+ executor.shutdownNow();
+ }
}
protected StoreObjectAccessorWithLock getWriter(String path) {
@@ -87,7 +134,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}
@Override
- public BrooklynMementoManifest loadMementoManifest(RebindExceptionHandler exceptionHandler) throws IOException {
+ public BrooklynMementoManifest loadMementoManifest(final RebindExceptionHandler exceptionHandler) throws IOException {
if (!running) {
throw new IllegalStateException("Persister not running; cannot load memento manifest from " + objectStore.getSummaryName());
}
@@ -110,67 +157,119 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
- entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+ entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
objectStore.getSummaryName() });
- BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
+ final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
- for (String subPath : entitySubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- String contents = objectAccessor.get();
- String id = (String) XmlUtil.xpath(contents, "/entity/id");
- String type = (String) XmlUtil.xpath(contents, "/entity/type");
- builder.entity(id, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
- }
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (final String subPath : entitySubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ String contents = read(subPath);
+ String id = (String) XmlUtil.xpath(contents, "/entity/id");
+ String type = (String) XmlUtil.xpath(contents, "/entity/type");
+ builder.entity(id, type);
+ LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
+ } catch (Exception e) {
+ LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
+ }
+ }}));
}
- for (String subPath : locationSubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- String contents = objectAccessor.get();
- String id = (String) XmlUtil.xpath(contents, "/location/id");
- String type = (String) XmlUtil.xpath(contents, "/location/type");
- builder.location(id, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
- }
+ for (final String subPath : locationSubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ String contents = read(subPath);
+ String id = (String) XmlUtil.xpath(contents, "/location/id");
+ String type = (String) XmlUtil.xpath(contents, "/location/type");
+ builder.location(id, type);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+ }
+ }}));
}
- for (String subPath : policySubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- String contents = objectAccessor.get();
- String id = (String) XmlUtil.xpath(contents, "/policy/id");
- String type = (String) XmlUtil.xpath(contents, "/policy/type");
- builder.policy(id, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
- }
+ for (final String subPath : policySubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ String contents = read(subPath);
+ String id = (String) XmlUtil.xpath(contents, "/policy/id");
+ String type = (String) XmlUtil.xpath(contents, "/policy/type");
+ builder.policy(id, type);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+ }
+ }}));
}
- for (String subPath : enricherSubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- String contents = objectAccessor.get();
- String id = (String) XmlUtil.xpath(contents, "/enricher/id");
- String type = (String) XmlUtil.xpath(contents, "/enricher/type");
- builder.enricher(id, type);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+ for (final String subPath : enricherSubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ String contents = read(subPath);
+ String id = (String) XmlUtil.xpath(contents, "/enricher/id");
+ String type = (String) XmlUtil.xpath(contents, "/enricher/type");
+ builder.enricher(id, type);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+ }
+ }}));
+ }
+
+ try {
+ // Wait for all, failing fast if any exceptions.
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+
+ List<Exception> exceptions = Lists.newArrayList();
+
+ for (ListenableFuture<?> future : futures) {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (InterruptedException e2) {
+ throw Exceptions.propagate(e2);
+ } catch (ExecutionException e2) {
+ LOG.warn("Problem loading memento manifest", e2);
+ exceptions.add(e2);
+ }
+ future.cancel(true);
+ }
}
+ if (exceptions.isEmpty()) {
+ throw Exceptions.propagate(e);
+ } else {
+ // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+ throw new CompoundRuntimeException("Problem loading mementos", exceptions);
+ }
+ }
+
+ BrooklynMementoManifest result = builder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIdToType().size(),
+ result.getLocationIdToType().size(), result.getPolicyIdToType().size(), result.getEnricherIdToType().size(),
+ objectStore.getSummaryName() });
+ }
+
+ if (result.getEntityIdToType().size() != entitySubPathList.size()) {
+ LOG.error("Lost an entity?!");
}
- if (LOG.isDebugEnabled())
- LOG.debug("Loaded memento manifest; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
- return builder.build();
+ return result;
}
@Override
- public BrooklynMemento loadMemento(LookupContext lookupContext, RebindExceptionHandler exceptionHandler) throws IOException {
+ public BrooklynMemento loadMemento(LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException {
if (!running) {
throw new IllegalStateException("Persister not running; cannot load memento from " + objectStore.getSummaryName());
}
@@ -195,71 +294,121 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
objectStore.getSummaryName() });
- BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
+ final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
serializer.setLookupContext(lookupContext);
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
try {
- for (String subPath : entitySubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- EntityMemento memento = (EntityMemento) serializer.fromString(objectAccessor.get());
- if (memento == null) {
- LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
- } else {
- builder.entity(memento);
- if (memento.isTopLevelApp()) {
- builder.applicationId(memento.getId());
+ for (final String subPath : entitySubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ EntityMemento memento = (EntityMemento) serializer.fromString(read(subPath));
+ if (memento == null) {
+ LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
+ } else {
+ builder.entity(memento);
+ if (memento.isTopLevelApp()) {
+ builder.applicationId(memento.getId());
+ }
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
}
- }
- } catch (Exception e) {
- exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
- }
+ }}));
}
- for (String subPath : locationSubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- LocationMemento memento = (LocationMemento) serializer.fromString(objectAccessor.get());
- if (memento == null) {
- LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
- } else {
- builder.location(memento);
- }
- } catch (Exception e) {
- exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
- }
+ for (final String subPath : locationSubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ LocationMemento memento = (LocationMemento) serializer.fromString(read(subPath));
+ if (memento == null) {
+ LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
+ } else {
+ builder.location(memento);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+ }
+ }}));
}
- for (String subPath : policySubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
- if (memento == null) {
- LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
- } else {
- builder.policy(memento);
- }
- } catch (Exception e) {
- exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
- }
+ for (final String subPath : policySubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+ PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
+ if (memento == null) {
+ LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
+ } else {
+ builder.policy(memento);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+ }
+ }}));
}
- for (String subPath : enricherSubPathList) {
- try {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
- if (memento == null) {
- LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
- } else {
- builder.enricher(memento);
+ for (final String subPath : enricherSubPathList) {
+ futures.add(executor.submit(new Runnable() {
+ public void run() {
+ try {
+ StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+ EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
+ if (memento == null) {
+ LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
+ } else {
+ builder.enricher(memento);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+ }
+ }}));
+ }
+
+ try {
+ // Wait for all, failing fast if any exceptions.
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+
+ List<Exception> exceptions = Lists.newArrayList();
+
+ for (ListenableFuture<?> future : futures) {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (InterruptedException e2) {
+ throw Exceptions.propagate(e2);
+ } catch (ExecutionException e2) {
+ LOG.warn("Problem loading memento", e2);
+ exceptions.add(e2);
+ }
+ future.cancel(true);
}
- } catch (Exception e) {
- exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+ }
+ if (exceptions.isEmpty()) {
+ throw Exceptions.propagate(e);
+ } else {
+ // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+ throw new CompoundRuntimeException("Problem loading mementos", exceptions);
}
}
-
+
} finally {
serializer.unsetLookupContext();
}
- if (LOG.isDebugEnabled()) LOG.debug("Loaded memento; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
- return builder.build();
+ BrooklynMemento result = builder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded memento; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(),
+ result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
+ objectStore.getSummaryName() });
+ }
+
+ return result;
}
@Override
@@ -268,67 +417,84 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointing entire memento, because not running");
return;
}
- objectStore.prepareForMasterUse();
- Stopwatch stopwatch = Stopwatch.createStarted();
-
- for (EntityMemento entity : newMemento.getEntityMementos().values()) {
- persist("entities", entity, exceptionHandler);
- }
- for (LocationMemento location : newMemento.getLocationMementos().values()) {
- persist("locations", location, exceptionHandler);
- }
- for (PolicyMemento policy : newMemento.getPolicyMementos().values()) {
- persist("policies", policy, exceptionHandler);
- }
- for (EnricherMemento enricher : newMemento.getEnricherMementos().values()) {
- persist("enrichers", enricher, exceptionHandler);
- }
+ Delta delta = PersisterDeltaImpl.builder()
+ .entities(newMemento.getEntityMementos().values())
+ .locations(newMemento.getLocationMementos().values())
+ .policies(newMemento.getPolicyMementos().values())
+ .enrichers(newMemento.getEnricherMementos().values())
+ .build();
+ Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
}
-
+
@Override
public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) {
if (!running) {
if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointed delta of memento, because not running");
return;
}
- objectStore.prepareForMasterUse();
-
- Stopwatch stopwatch = Stopwatch.createStarted();
-
- for (EntityMemento entity : delta.entities()) {
- persist("entities", entity, exceptionHandler);
- }
- for (LocationMemento location : delta.locations()) {
- persist("locations", location, exceptionHandler);
- }
- for (PolicyMemento policy : delta.policies()) {
- persist("policies", policy, exceptionHandler);
- }
- for (EnricherMemento enricher : delta.enrichers()) {
- persist("enrichers", enricher, exceptionHandler);
- }
-
- for (String id : delta.removedEntityIds()) {
- delete("entities", id, exceptionHandler);
- }
- for (String id : delta.removedLocationIds()) {
- delete("locations", id, exceptionHandler);
- }
- for (String id : delta.removedPolicyIds()) {
- delete("policies", id, exceptionHandler);
- }
- for (String id : delta.removedEnricherIds()) {
- delete("enrichers", id, exceptionHandler);
- }
+ Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of memento in {}; updated {} entities, {} locations and {} policies; " +
"removing {} entities, {} locations and {} policies",
new Object[] {Time.makeTimeStringRounded(stopwatch), delta.entities(), delta.locations(), delta.policies(),
delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
}
+
+ private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
+ try {
+ lock.writeLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ try {
+ objectStore.prepareForMasterUse();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (EntityMemento entity : delta.entities()) {
+ futures.add(asyncPersist("entities", entity, exceptionHandler));
+ }
+ for (LocationMemento location : delta.locations()) {
+ futures.add(asyncPersist("locations", location, exceptionHandler));
+ }
+ for (PolicyMemento policy : delta.policies()) {
+ futures.add(asyncPersist("policies", policy, exceptionHandler));
+ }
+ for (EnricherMemento enricher : delta.enrichers()) {
+ futures.add(asyncPersist("enrichers", enricher, exceptionHandler));
+ }
+
+ for (String id : delta.removedEntityIds()) {
+ futures.add(asyncDelete("entities", id, exceptionHandler));
+ }
+ for (String id : delta.removedLocationIds()) {
+ futures.add(asyncDelete("locations", id, exceptionHandler));
+ }
+ for (String id : delta.removedPolicyIds()) {
+ futures.add(asyncDelete("policies", id, exceptionHandler));
+ }
+ for (String id : delta.removedEnricherIds()) {
+ futures.add(asyncDelete("enrichers", id, exceptionHandler));
+ }
+
+ try {
+ // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
+ // But then propagate failure if any fail. (hence the two calls).
+ Futures.successfulAsList(futures).get();
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+
+ return stopwatch;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
@Override
@VisibleForTesting
@@ -336,11 +502,20 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
waitForWritesCompleted(Duration.of(timeout, unit));
}
+ @Override
public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+ lock.writeLock().lockInterruptibly();
+ lock.writeLock().unlock();
+
for (StoreObjectAccessorWithLock writer : writers.values())
writer.waitForCurrentWrites(timeout);
}
+ private String read(String subPath) {
+ StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+ return objectAccessor.get();
+ }
+
private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) {
try {
getWriter(getPath(subPath, memento.getId())).put(serializer.toString(memento));
@@ -348,7 +523,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
exceptionHandler.onPersistMementoFailed(memento, e);
}
}
-
+
private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) {
try {
StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
@@ -361,6 +536,27 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}
}
+ private ListenableFuture<String> asyncRead(final String subPath) {
+ return executor.submit(new Callable<String>() {
+ public String call() {
+ return read(subPath);
+ }});
+ }
+
+ private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ persist(subPath, memento, exceptionHandler);
+ }});
+ }
+
+ private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ delete(subPath, id, exceptionHandler);
+ }});
+ }
+
private String getPath(String subPath, String id) {
return subPath+"/"+id;
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
index 290cc3b..ea9f9f6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
@@ -175,6 +175,13 @@ public class RebindEntityTest extends RebindTestFixtureWithApp {
assertEquals(newE.getDisplayName(), "mydisplayname");
}
+ // Saw this fail during development (fixed now); but want at least one of these tests to be run
+ // many times for stress testing purposes
+ @Test(invocationCount=100, groups="Integeration")
+ public void testRestoresEntityIdAndDisplayNameManyTimes() throws Exception {
+ testRestoresEntityIdAndDisplayName();
+ }
+
@Test
public void testCanCustomizeRebind() throws Exception {
MyEntity2 origE = origApp.createAndManageChild(EntitySpec.create(MyEntity2.class).configure("myfield", "myval"));
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 4c8e8f7..2fd873e 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -16,6 +16,7 @@ import brooklyn.internal.BrooklynFeatureEnablement;
import brooklyn.management.ManagementContext;
import brooklyn.management.ha.HighAvailabilityMode;
import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
import brooklyn.mementos.BrooklynMementoManifest;
import brooklyn.test.entity.LocalManagementContextForTests;
import brooklyn.util.os.Os;
@@ -111,7 +112,10 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
FileBasedObjectStore objectStore = new FileBasedObjectStore(mementoDir);
objectStore.injectManagementContext(newManagementContext);
objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
- BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+ BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+ classLoader);
RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
persister.stop();
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
index 4b4e54c..d402bb9 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
@@ -27,6 +27,7 @@ import brooklyn.location.Location;
import brooklyn.management.ManagementContext;
import brooklyn.management.ha.HighAvailabilityMode;
import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
import brooklyn.mementos.BrooklynMemento;
import brooklyn.test.entity.LocalManagementContextForTests;
import brooklyn.util.javalang.Serializers;
@@ -170,7 +171,10 @@ public class RebindTestUtils {
objectStore.injectManagementContext(unstarted);
objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
- BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+ BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ unstarted.getBrooklynProperties(),
+ classLoader);
((RebindManagerImpl) unstarted.getRebindManager()).setPeriodicPersistPeriod(persistPeriod);
unstarted.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
return unstarted;
@@ -235,7 +239,10 @@ public class RebindTestUtils {
public static Collection<Application> rebindAll(ManagementContext newManagementContext, File mementoDir, ClassLoader classLoader, RebindExceptionHandler exceptionHandler, PersistenceObjectStore objectStore) throws Exception {
LOG.info("Rebinding app, using directory "+mementoDir);
- BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+ BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+ classLoader);
newManagementContext.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
List<Application> newApps;
if (exceptionHandler == null) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
index e33ac4f..0183985 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -80,7 +80,7 @@ public class HighAvailabilityManagerSplitBrainTest {
objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
- BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+ BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
ha = new HighAvailabilityManagerImpl(mgmt)
.setPollPeriod(Duration.PRACTICALLY_FOREVER)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
index b572a80..6e89c37 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
@@ -68,7 +68,10 @@ public abstract class HighAvailabilityManagerTestFixture {
objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader);
((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
- BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+ BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ managementContext.getBrooklynProperties(),
+ classLoader);
managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
manager = new HighAvailabilityManagerImpl(managementContext)
.setPollPeriod(getPollPeriod())
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
----------------------------------------------------------------------
diff --git a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
index 64ef089..638c029 100644
--- a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
+++ b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
@@ -541,7 +541,9 @@ public class BrooklynLauncher {
RebindManager rebindManager = managementContext.getRebindManager();
- BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore,
+ BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+ objectStore,
+ ((ManagementContextInternal)managementContext).getBrooklynProperties(),
managementContext.getCatalog().getRootClassLoader());
PersistenceExceptionHandler persistenceExceptionHandler = PersistenceExceptionHandlerImpl.builder().build();
((RebindManagerImpl) rebindManager).setPeriodicPersistPeriod(persistPeriod);
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
index 5f0fc8a..42fddb2 100644
--- a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
@@ -80,6 +80,8 @@ public class Exceptions {
public static void propagateIfFatal(Throwable throwable) {
if (throwable instanceof InterruptedException)
throw new RuntimeInterruptedException((InterruptedException) throwable);
+ if (throwable instanceof RuntimeInterruptedException)
+ throw (RuntimeInterruptedException) throwable;
if (throwable instanceof Error)
throw (Error) throwable;
}
[4/4] git commit: This closes #41
Posted by he...@apache.org.
This closes #41
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/73dcb1d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/73dcb1d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/73dcb1d8
Branch: refs/heads/master
Commit: 73dcb1d8d45647d365c7144b8f886b9b45500aa2
Parents: 1a9732e 8191d93
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Jul 4 14:56:50 2014 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Jul 4 14:56:50 2014 +0100
----------------------------------------------------------------------
.../mementos/BrooklynMementoPersister.java | 2 +-
.../rebind/PeriodicDeltaChangeListener.java | 5 +
.../entity/rebind/PersisterDeltaImpl.java | 47 +-
.../rebind/RebindExceptionHandlerImpl.java | 23 +-
.../entity/rebind/RebindManagerImpl.java | 4 +-
.../entity/rebind/dto/BrooklynMementoImpl.java | 12 +-
.../rebind/dto/BrooklynMementoManifestImpl.java | 16 +-
.../AbstractBrooklynMementoPersister.java | 2 +-
.../BrooklynMementoPersisterToMultiFile.java | 16 +-
.../BrooklynMementoPersisterToObjectStore.java | 529 +++++++++++++------
.../persister/StoreObjectAccessorLocking.java | 9 +-
.../entity/rebind/RebindEntityTest.java | 7 +
.../entity/rebind/RebindTestFixture.java | 14 +-
.../brooklyn/entity/rebind/RebindTestUtils.java | 33 +-
.../BrooklynMementoPersisterTestFixture.java | 2 +-
.../HighAvailabilityManagerSplitBrainTest.java | 2 +-
.../ha/HighAvailabilityManagerTestFixture.java | 5 +-
.../qa/performance/AbstractPerformanceTest.java | 3 +
.../FilePersistencePerformanceTest.java | 4 +-
.../BlobStorePersistencePerformanceTest.java | 103 ++++
...tyToBlobStorePersistencePerformanceTest.java | 47 ++
.../brooklyn/launcher/BrooklynLauncher.java | 4 +-
.../brooklyn/util/exceptions/Exceptions.java | 2 +
23 files changed, 680 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
[3/4] git commit: Incorporate comments for PR #41: thread pool for
ObjectStore
Posted by he...@apache.org.
Incorporate comments for PR #41: thread pool for ObjectStore
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/8191d934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/8191d934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/8191d934
Branch: refs/heads/master
Commit: 8191d934c873681ed28950debd17dd1edeae2417
Parents: d04761e
Author: Aled Sage <al...@gmail.com>
Authored: Fri Jul 4 14:23:42 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:38:12 2014 +0100
----------------------------------------------------------------------
.../mementos/BrooklynMementoPersister.java | 2 +-
.../entity/rebind/RebindManagerImpl.java | 2 +-
.../AbstractBrooklynMementoPersister.java | 2 +-
.../BrooklynMementoPersisterToMultiFile.java | 16 +++--
.../BrooklynMementoPersisterToObjectStore.java | 61 ++++++++++++--------
.../persister/StoreObjectAccessorLocking.java | 9 ++-
.../entity/rebind/RebindTestFixture.java | 2 +-
.../BrooklynMementoPersisterTestFixture.java | 2 +-
8 files changed, 60 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
index 016c10f..579751d 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
@@ -42,7 +42,7 @@ public interface BrooklynMementoPersister {
void delta(Delta delta, PersistenceExceptionHandler exceptionHandler);
- void stop();
+ void stop(boolean graceful);
@VisibleForTesting
void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 1f2ae81..af25906 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -192,7 +192,7 @@ public class RebindManagerImpl implements RebindManager {
public void stop() {
running = false;
if (realChangeListener != null) realChangeListener.stop();
- if (persister != null) persister.stop();
+ if (persister != null) persister.stop(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
index 04e69e7..a8b5f52 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
@@ -47,7 +47,7 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement
}
@Override
- public void stop() {
+ public void stop(boolean graceful) {
// no-op
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index c2bbdd0..06791bc 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -102,13 +102,17 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
}
@Override
- public void stop() {
+ public void stop(boolean graceful) {
running = false;
- executor.shutdown();
- try {
- executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
+ if (graceful) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ } else {
+ executor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8fcb3b3..d036648 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,7 +6,6 @@ import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory;
import brooklyn.config.BrooklynProperties;
import brooklyn.config.ConfigKey;
import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.rebind.PeriodicDeltaChangeListener;
import brooklyn.entity.rebind.PersistenceExceptionHandler;
import brooklyn.entity.rebind.PersisterDeltaImpl;
import brooklyn.entity.rebind.RebindExceptionHandler;
@@ -84,7 +84,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
* Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
* for any concurrent call to complete.
*/
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
this.objectStore = checkNotNull(objectStore, "objectStore");
@@ -109,18 +109,29 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}}));
}
- public PersistenceObjectStore getObjectStore() {
- return objectStore;
- }
-
@Override
- public void stop() {
+ public void stop(boolean graceful) {
running = false;
if (executor != null) {
- executor.shutdownNow();
+ if (graceful) {
+ // a very long timeout to ensure we don't lose state.
+ // If persisting thousands of entities over slow network to Object Store, could take minutes.
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ } else {
+ executor.shutdownNow();
+ }
}
}
+ public PersistenceObjectStore getObjectStore() {
+ return objectStore;
+ }
+
protected StoreObjectAccessorWithLock getWriter(String path) {
String id = path.substring(path.lastIndexOf('/')+1);
synchronized (writers) {
@@ -157,7 +168,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
- entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+ entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
objectStore.getSummaryName() });
final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
@@ -172,9 +183,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
String id = (String) XmlUtil.xpath(contents, "/entity/id");
String type = (String) XmlUtil.xpath(contents, "/entity/type");
builder.entity(id, type);
- LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
} catch (Exception e) {
- LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
Exceptions.propagateIfFatal(e);
exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
}
@@ -443,6 +452,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
}
+ /**
+ * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
+ * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
+ *
+ * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
+ * don't do unnecessary repeated writes of an entity.
+ */
private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
try {
lock.writeLock().lockInterruptibly();
@@ -504,11 +520,17 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
@Override
public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
- lock.writeLock().lockInterruptibly();
- lock.writeLock().unlock();
-
- for (StoreObjectAccessorWithLock writer : writers.values())
- writer.waitForCurrentWrites(timeout);
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ lock.readLock().unlock();
+
+ // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
+ // each writer is now synchronous.
+ for (StoreObjectAccessorWithLock writer : writers.values())
+ writer.waitForCurrentWrites(timeout);
+ } else {
+ throw new TimeoutException("Timeout waiting for writes to "+objectStore);
+ }
}
private String read(String subPath) {
@@ -536,13 +558,6 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}
}
- private ListenableFuture<String> asyncRead(final String subPath) {
- return executor.submit(new Callable<String>() {
- public String call() {
- return read(subPath);
- }});
- }
-
private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
return executor.submit(new Runnable() {
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
index 1ec2afd..c43df3b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
@@ -4,6 +4,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -152,8 +153,12 @@ public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreO
@Override
public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
try {
- lock.readLock().lockInterruptibly();
- lock.readLock().unlock();
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ lock.readLock().unlock();
+ } else {
+ throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout);
+ }
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 2fd873e..9e233d6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -118,7 +118,7 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
classLoader);
RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
- persister.stop();
+ persister.stop(false);
return mementoManifest;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
index 1946f75..73e1443 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
@@ -76,7 +76,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
public void tearDown() throws Exception {
if (localManagementContext != null) Entities.destroyAll(localManagementContext);
if (app != null) Entities.destroyAll(app.getManagementContext());
- if (persister != null) persister.stop();
+ if (persister != null) persister.stop(false);
if (objectStore!=null) objectStore.deleteCompletely();
persister = null;
}