You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/07/04 16:02:41 UTC

[3/4] git commit: Incorporate comments for PR #41: thread pool for ObjectStore

Incorporate comments for PR #41: thread pool for ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/8191d934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/8191d934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/8191d934

Branch: refs/heads/master
Commit: 8191d934c873681ed28950debd17dd1edeae2417
Parents: d04761e
Author: Aled Sage <al...@gmail.com>
Authored: Fri Jul 4 14:23:42 2014 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Fri Jul 4 14:38:12 2014 +0100

----------------------------------------------------------------------
 .../mementos/BrooklynMementoPersister.java      |  2 +-
 .../entity/rebind/RebindManagerImpl.java        |  2 +-
 .../AbstractBrooklynMementoPersister.java       |  2 +-
 .../BrooklynMementoPersisterToMultiFile.java    | 16 +++--
 .../BrooklynMementoPersisterToObjectStore.java  | 61 ++++++++++++--------
 .../persister/StoreObjectAccessorLocking.java   |  9 ++-
 .../entity/rebind/RebindTestFixture.java        |  2 +-
 .../BrooklynMementoPersisterTestFixture.java    |  2 +-
 8 files changed, 60 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
index 016c10f..579751d 100644
--- a/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoPersister.java
@@ -42,7 +42,7 @@ public interface BrooklynMementoPersister {
 
     void delta(Delta delta, PersistenceExceptionHandler exceptionHandler);
 
-    void stop();
+    void stop(boolean graceful);
 
     @VisibleForTesting
     void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 1f2ae81..af25906 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -192,7 +192,7 @@ public class RebindManagerImpl implements RebindManager {
     public void stop() {
         running = false;
         if (realChangeListener != null) realChangeListener.stop();
-        if (persister != null) persister.stop();
+        if (persister != null) persister.stop(true);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
index 04e69e7..a8b5f52 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/AbstractBrooklynMementoPersister.java
@@ -47,7 +47,7 @@ public abstract class AbstractBrooklynMementoPersister implements BrooklynMement
     }
     
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         // no-op
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index c2bbdd0..06791bc 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -102,13 +102,17 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
     }
     
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         running = false;
-        executor.shutdown();
-        try {
-            executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
+        if (graceful) {
+            executor.shutdown();
+            try {
+                executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                throw Exceptions.propagate(e);
+            }
+        } else {
+            executor.shutdownNow();
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8fcb3b3..d036648 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,7 +6,6 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -21,6 +20,7 @@ import org.slf4j.LoggerFactory;
 import brooklyn.config.BrooklynProperties;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.rebind.PeriodicDeltaChangeListener;
 import brooklyn.entity.rebind.PersistenceExceptionHandler;
 import brooklyn.entity.rebind.PersisterDeltaImpl;
 import brooklyn.entity.rebind.RebindExceptionHandler;
@@ -84,7 +84,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
      * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
      * for any concurrent call to complete.
      */
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
     public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
         this.objectStore = checkNotNull(objectStore, "objectStore");
@@ -109,18 +109,29 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             }}));
     }
 
-    public PersistenceObjectStore getObjectStore() {
-        return objectStore;
-    }
-
     @Override
-    public void stop() {
+    public void stop(boolean graceful) {
         running = false;
         if (executor != null) {
-            executor.shutdownNow();
+            if (graceful) {
+                // a very long timeout to ensure we don't lose state. 
+                // If persisting thousands of entities over slow network to Object Store, could take minutes.
+                executor.shutdown();
+                try {
+                    executor.awaitTermination(1, TimeUnit.HOURS);
+                } catch (InterruptedException e) {
+                    throw Exceptions.propagate(e);
+                }
+            } else {
+                executor.shutdownNow();
+            }
         }
     }
     
+    public PersistenceObjectStore getObjectStore() {
+        return objectStore;
+    }
+
     protected StoreObjectAccessorWithLock getWriter(String path) {
         String id = path.substring(path.lastIndexOf('/')+1);
         synchronized (writers) {
@@ -157,7 +168,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         Stopwatch stopwatch = Stopwatch.createStarted();
 
         LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
-            entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
         final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
@@ -172,9 +183,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                         String id = (String) XmlUtil.xpath(contents, "/entity/id");
                         String type = (String) XmlUtil.xpath(contents, "/entity/type");
                         builder.entity(id, type);
-                        LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
                     } catch (Exception e) {
-                        LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
                         Exceptions.propagateIfFatal(e);
                         exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
                     }
@@ -443,6 +452,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                 delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
     }
     
+    /**
+     * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
+     * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
+     * 
+     * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
+     * don't do unnecessary repeated writes of an entity.
+     */
     private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
         try {
             lock.writeLock().lockInterruptibly();
@@ -504,11 +520,17 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     
     @Override
     public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
-        lock.writeLock().lockInterruptibly();
-        lock.writeLock().unlock();
-        
-        for (StoreObjectAccessorWithLock writer : writers.values())
-            writer.waitForCurrentWrites(timeout);
+        boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+        if (locked) {
+            lock.readLock().unlock();
+            
+            // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
+            // each writer is now synchronous.
+            for (StoreObjectAccessorWithLock writer : writers.values())
+                writer.waitForCurrentWrites(timeout);
+        } else {
+            throw new TimeoutException("Timeout waiting for writes to "+objectStore);
+        }
     }
 
     private String read(String subPath) {
@@ -536,13 +558,6 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
-    private ListenableFuture<String> asyncRead(final String subPath) {
-        return executor.submit(new Callable<String>() {
-            public String call() {
-                return read(subPath);
-            }});
-    }
-
     private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
         return executor.submit(new Runnable() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
index 1ec2afd..c43df3b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
@@ -4,6 +4,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -152,8 +153,12 @@ public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreO
     @Override
     public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
         try {
-            lock.readLock().lockInterruptibly();
-            lock.readLock().unlock();
+            boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+            if (locked) {
+                lock.readLock().unlock();
+            } else {
+                throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout);
+            }
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 2fd873e..9e233d6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -118,7 +118,7 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
                 classLoader);
         RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
         BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
-        persister.stop();
+        persister.stop(false);
         return mementoManifest;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/8191d934/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
index 1946f75..73e1443 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
@@ -76,7 +76,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
     public void tearDown() throws Exception {
         if (localManagementContext != null) Entities.destroyAll(localManagementContext);
         if (app != null) Entities.destroyAll(app.getManagementContext());
-        if (persister != null) persister.stop();
+        if (persister != null) persister.stop(false);
         if (objectStore!=null) objectStore.deleteCompletely();
         persister = null;
     }