You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/04 16:02:41 UTC
[3/4] git commit: Incorporate comments for PR #41: thread pool for
ObjectStore
Incorporate comments for PR #41: thread pool for ObjectStore
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/8191d934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/8191d934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/8191d934
Branch: refs/heads/master
Commit: 8191d934c873681ed28950debd17dd1edeae2417
Parents: d04761e
Author: Aled Sage <al...@gmail.com>
Authored: Fri Jul 4 14:23:42 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:38:12 2014 +0100
----------------------------------------------------------------------
.../mementos/BrooklynMementoPersister.java | 2 +-
.../entity/rebind/RebindManagerImpl.java | 2 +-
.../AbstractBrooklynMementoPersister.java | 2 +-
.../BrooklynMementoPersisterToMultiFile.java | 16 +++--
.../BrooklynMementoPersisterToObjectStore.java | 61 ++++++++++++--------
.../persister/StoreObjectAccessorLocking.java | 9 ++-
.../entity/rebind/RebindTestFixture.java | 2 +-
.../BrooklynMementoPersisterTestFixture.java | 2 +-
8 files changed, 60 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
index 016c10f..579751d 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
@@ -42,7 +42,7 @@ public interface BrooklynMementoPersister {
void delta(Delta delta, PersistenceExceptionHandler exceptionHandler);
- void stop();
+ void stop(boolean graceful);
@VisibleForTesting
void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 1f2ae81..af25906 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -192,7 +192,7 @@ public class RebindManagerImpl implements RebindManager {
public void stop() {
running = false;
if (realChangeListener != null) realChangeListener.stop();
- if (persister != null) persister.stop();
+ if (persister != null) persister.stop(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
index 04e69e7..a8b5f52 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
@@ -47,7 +47,7 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement
}
@Override
- public void stop() {
+ public void stop(boolean graceful) {
// no-op
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index c2bbdd0..06791bc 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -102,13 +102,17 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
}
@Override
- public void stop() {
+ public void stop(boolean graceful) {
running = false;
- executor.shutdown();
- try {
- executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
+ if (graceful) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ } else {
+ executor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8fcb3b3..d036648 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,7 +6,6 @@ import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory;
import brooklyn.config.BrooklynProperties;
import brooklyn.config.ConfigKey;
import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.rebind.PeriodicDeltaChangeListener;
import brooklyn.entity.rebind.PersistenceExceptionHandler;
import brooklyn.entity.rebind.PersisterDeltaImpl;
import brooklyn.entity.rebind.RebindExceptionHandler;
@@ -84,7 +84,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
* Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
* for any concurrent call to complete.
*/
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
this.objectStore = checkNotNull(objectStore, "objectStore");
@@ -109,18 +109,29 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}}));
}
- public PersistenceObjectStore getObjectStore() {
- return objectStore;
- }
-
@Override
- public void stop() {
+ public void stop(boolean graceful) {
running = false;
if (executor != null) {
- executor.shutdownNow();
+ if (graceful) {
+ // a very long timeout to ensure we don't lose state.
+ // If persisting thousands of entities over slow network to Object Store, could take minutes.
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ } else {
+ executor.shutdownNow();
+ }
}
}
+ public PersistenceObjectStore getObjectStore() {
+ return objectStore;
+ }
+
protected StoreObjectAccessorWithLock getWriter(String path) {
String id = path.substring(path.lastIndexOf('/')+1);
synchronized (writers) {
@@ -157,7 +168,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
Stopwatch stopwatch = Stopwatch.createStarted();
LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
- entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+ entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
objectStore.getSummaryName() });
final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
@@ -172,9 +183,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
String id = (String) XmlUtil.xpath(contents, "/entity/id");
String type = (String) XmlUtil.xpath(contents, "/entity/type");
builder.entity(id, type);
- LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
} catch (Exception e) {
- LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
Exceptions.propagateIfFatal(e);
exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
}
@@ -443,6 +452,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
}
+ /**
+ * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
+ * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
+ *
+ * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
+ * don't do unnecessary repeated writes of an entity.
+ */
private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
try {
lock.writeLock().lockInterruptibly();
@@ -504,11 +520,17 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
@Override
public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
- lock.writeLock().lockInterruptibly();
- lock.writeLock().unlock();
-
- for (StoreObjectAccessorWithLock writer : writers.values())
- writer.waitForCurrentWrites(timeout);
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ lock.readLock().unlock();
+
+ // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
+ // each writer is now synchronous.
+ for (StoreObjectAccessorWithLock writer : writers.values())
+ writer.waitForCurrentWrites(timeout);
+ } else {
+ throw new TimeoutException("Timeout waiting for writes to "+objectStore);
+ }
}
private String read(String subPath) {
@@ -536,13 +558,6 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
}
}
- private ListenableFuture<String> asyncRead(final String subPath) {
- return executor.submit(new Callable<String>() {
- public String call() {
- return read(subPath);
- }});
- }
-
private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
return executor.submit(new Runnable() {
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
index 1ec2afd..c43df3b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
@@ -4,6 +4,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -152,8 +153,12 @@ public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreO
@Override
public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
try {
- lock.readLock().lockInterruptibly();
- lock.readLock().unlock();
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ lock.readLock().unlock();
+ } else {
+ throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout);
+ }
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 2fd873e..9e233d6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -118,7 +118,7 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
classLoader);
RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
- persister.stop();
+ persister.stop(false);
return mementoManifest;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
index 1946f75..73e1443 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
@@ -76,7 +76,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
public void tearDown() throws Exception {
if (localManagementContext != null) Entities.destroyAll(localManagementContext);
if (app != null) Entities.destroyAll(app.getManagementContext());
- if (persister != null) persister.stop();
+ if (persister != null) persister.stop(false);
if (objectStore!=null) objectStore.deleteCompletely();
persister = null;
}