You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/07/21 17:42:33 UTC

svn commit: r1612322 - in /qpid/trunk/qpid/java/bdbstore/src: main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Author: orudyy
Date: Mon Jul 21 15:42:32 2014
New Revision: 1612322

URL: http://svn.apache.org/r1612322
Log:
QPID-5909: Allow setting of BDB HA message store durability many times

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Mon Jul 21 15:42:32 2014
@@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
     private final Durability _defaultDurability;
-    private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>();
 
     private volatile Durability _realMessageStoreDurability = null;
     private volatile CoalescingCommiter _coalescingCommiter = null;
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+    private volatile Durability _messageStoreDurability;
 
     private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Transaction beginTransaction()
     {
-        if (_messageStoreDurability.get() == null)
+        if (_messageStoreDurability == null)
         {
             throw new IllegalStateException("Message store durability is not set");
         }
@@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade
         }
 
         if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
-                && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC)
+                && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
         {
             return _coalescingCommiter.commit(tx, syncCommit);
         }
@@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade
 
     public Durability getMessageStoreDurability()
     {
-        return _messageStoreDurability.get();
+        return _messageStoreDurability;
     }
 
     public boolean isCoalescingSync()
@@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    /**
+     * This method should only be invoked from configuration thread on virtual host activation.
+     * Otherwise, invocation of this method whilst coalescing committer is committing transactions might result in transaction aborts.
+     */
     public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy)
     {
-        if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
+        if (_messageStoreDurability == null || localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync()
+                || remoteTransactionSynchronizationPolicy != _messageStoreDurability.getReplicaSync()
+                || replicaAcknowledgmentPolicy != _messageStoreDurability.getReplicaAck())
         {
+            _messageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
+
+            if (_coalescingCommiter != null)
+            {
+                _coalescingCommiter.stop();
+                _coalescingCommiter = null;
+            }
+
             if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
             {
                 localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
@@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade
             }
             _realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
         }
-        else
-        {
-            throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get());
-        }
     }
 
     public void setPermittedNodes(Collection<String> permittedNodes)

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Mon Jul 21 15:42:32 2014
@@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacade
                 new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
                 master.getRealMessageStoreDurability());
         assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
-        assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+        assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
 
-        try
-        {
-            master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
-            fail("Cannot set message store durability twice");
-        }
-        catch(IllegalStateException e)
-        {
-            // pass
-        }
+        master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
+        assertEquals("Unexpected message store durability",
+                new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
+                master.getRealMessageStoreDurability());
+        assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
     }
 
     public void testGetNodeState() throws Exception



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org