You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2018/06/05 08:44:34 UTC

[3/5] brooklyn-server git commit: For tests, support RebindManager.hasPending()

For tests, support RebindManager.hasPending()

Similar concept to waitForPendingComplete().


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/68d88ebd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/68d88ebd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/68d88ebd

Branch: refs/heads/master
Commit: 68d88ebde64ca03a2c808723a6a32df8c2e443c6
Parents: dedaffd
Author: Aled Sage <al...@gmail.com>
Authored: Thu May 24 21:49:27 2018 +0100
Committer: Aled Sage <al...@gmail.com>
Committed: Thu May 24 21:54:52 2018 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/rebind/RebindManager.java |  6 +++++
 .../mementos/BrooklynMementoPersister.java      |  3 +++
 .../NonDeploymentManagementContext.java         |  4 +++
 .../BrooklynMementoPersisterToObjectStore.java  | 27 ++++++++++++++++++++
 .../mgmt/persist/PersistenceObjectStore.java    |  7 +++++
 .../persist/StoreObjectAccessorLocking.java     | 15 +++++++++++
 .../rebind/PeriodicDeltaChangeListener.java     | 22 ++++++++++++++++
 .../core/mgmt/rebind/RebindManagerImpl.java     |  7 +++++
 .../core/mgmt/rebind/RebindTestUtils.java       |  4 +++
 9 files changed, 95 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
index 9dcda12..c702837 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/RebindManager.java
@@ -113,6 +113,12 @@ public interface RebindManager {
     /** waits for any needed or pending writes to complete */
     public void waitForPendingComplete(Duration duration, boolean canTrigger) throws InterruptedException, TimeoutException;
 
+    @VisibleForTesting
+    /**
+     * whether there are any needed or pending writes.
+     */
+    public boolean hasPending();
+
     /** Forcibly performs persistence, in the foreground, either full (all entities) or incremental;
      * if no exception handler specified, the default one from the persister is used.
      * <p>

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
index 8cf00b8..14793fb 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/rebind/mementos/BrooklynMementoPersister.java
@@ -111,6 +111,9 @@ public interface BrooklynMementoPersister {
     @VisibleForTesting
     void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException;
 
+    @VisibleForTesting
+    public boolean isWriting();
+
     String getBackingStoreDescription();
     
     /** All methods on this interface are unmodifiable by the caller. Sub-interfaces may introduce modifiers. */

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
index 9c9d366..d517513 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java
@@ -612,6 +612,10 @@ public class NonDeploymentManagementContext implements ManagementContextInternal
             throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation.");
         }
         @Override
+        public boolean hasPending() {
+            throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation.");
+        }
+        @Override
         public void forcePersistNow(boolean full, PersistenceExceptionHandler exceptionHandler) {
             throw new IllegalStateException("Non-deployment context "+NonDeploymentManagementContext.this+" is not valid for this operation.");
         }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
index 260e2e2..aab8777 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
@@ -758,6 +758,33 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
+    @Override
+    public boolean isWriting() {
+        boolean locked;
+        try {
+            locked = lock.readLock().tryLock(0, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        if (locked) {
+            ImmutableSet<StoreObjectAccessorWithLock> wc;
+            synchronized (writers) {
+                wc = ImmutableSet.copyOf(writers.values());
+            }
+            lock.readLock().unlock();
+            
+            for (StoreObjectAccessorWithLock writer : wc) {
+                if (writer.isWriting()) {
+                    return true;
+                }
+            }
+            
+            return false;
+        } else {
+            return true;
+        }
+    }
+
     private String read(String subPath) {
         StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
         return objectAccessor.get();

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
index 18a5b64..cbb046b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
 import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteSource;
 
 /**
@@ -60,6 +61,12 @@ public interface PersistenceObjectStore {
          * and ensure they subsequently <code>unlock()</code> it of course. see {@link #getLockObject()}. */
         void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException;
         
+        /**
+         * Whether there are any scheduled write lock operations, either queued or executing.
+         */
+        @VisibleForTesting
+        boolean isWriting();
+        
         /** returns the underlying lock in case callers need more complex synchronization control */ 
         ReadWriteLock getLockObject();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
index 49de188..ac4383b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
@@ -226,6 +226,21 @@ public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreO
     }
     
     @Override
+    public boolean isWriting() {
+        try {
+            boolean locked = lock.readLock().tryLock(0, TimeUnit.MILLISECONDS);
+            if (locked) {
+                lock.readLock().unlock();
+                return false;
+            } else {
+                return true;
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    @Override
     public Date getLastModifiedDate() {
         return delegate.getLastModifiedDate();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
index e4b2d7d..1a971b6 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
@@ -338,6 +338,28 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
         }
     }
 
+    /** Check if there are any pending changes. May block to get mutex when checking. */
+    @VisibleForTesting
+    public boolean hasPending() {
+        if (!isActive() && state != ListenerState.STOPPING) return false;
+        
+        // if can't get mutex, then some changes are being applied.
+        try {
+            if (persistingMutex.tryAcquire(0, TimeUnit.MILLISECONDS)) {
+                try {
+                    // now no one else is writing
+                    return !deltaCollector.isEmpty();
+                } finally {
+                    persistingMutex.release();
+                }
+            } else {
+                return false;
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
     /**
      * Indicates whether persistence is active. 
      * Even when not active, changes will still be tracked unless {@link #isStopped()}.

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 20e8f2a..53d5d6a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -460,6 +460,13 @@ public class RebindManagerImpl implements RebindManager {
 
     @Override
     @VisibleForTesting
+    public boolean hasPending()  {
+        if (persistenceStoreAccess == null || !persistenceRunning) return false;
+        return persistenceRealChangeListener.hasPending() || persistenceStoreAccess.isWriting();
+    }
+
+    @Override
+    @VisibleForTesting
     public void forcePersistNow(boolean full, PersistenceExceptionHandler exceptionHandler) {
         if (persistenceStoreAccess == null || persistenceRealChangeListener == null) {
             LOG.info("Skipping forced persist; no persistence mechanism available");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/68d88ebd/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java
index 607631d..4107735 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestUtils.java
@@ -443,6 +443,10 @@ public class RebindTestUtils {
         managementContext.getRebindManager().waitForPendingComplete(TIMEOUT, true);
     }
 
+    public static boolean hasPendingPersists(ManagementContext managementContext) {
+        return managementContext.getRebindManager().hasPending();
+    }
+
     public static void stopPersistence(Application origApp) throws InterruptedException, TimeoutException {
         stopPersistence(origApp.getManagementContext());
     }