You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/07/21 17:42:33 UTC
svn commit: r1612322 - in /qpid/trunk/qpid/java/bdbstore/src:
main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Author: orudyy
Date: Mon Jul 21 15:42:32 2014
New Revision: 1612322
URL: http://svn.apache.org/r1612322
Log:
QPID-5909: Allow setting of BDB HA message store durability many times
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Mon Jul 21 15:42:32 2014
@@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
- private final AtomicReference<Durability> _messageStoreDurability = new AtomicReference<Durability>();
private volatile Durability _realMessageStoreDurability = null;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+ private volatile Durability _messageStoreDurability;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade
@Override
public Transaction beginTransaction()
{
- if (_messageStoreDurability.get() == null)
+ if (_messageStoreDurability == null)
{
throw new IllegalStateException("Message store durability is not set");
}
@@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade
}
if (_coalescingCommiter != null && _realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
- && _messageStoreDurability.get().getLocalSync() == SyncPolicy.SYNC)
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
{
return _coalescingCommiter.commit(tx, syncCommit);
}
@@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade
public Durability getMessageStoreDurability()
{
- return _messageStoreDurability.get();
+ return _messageStoreDurability;
}
public boolean isCoalescingSync()
@@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade
}
}
+ /**
+ * This method should only be invoked from configuration thread on virtual host activation.
+ * Otherwise, invocation of this method whilst coalescing committer is committing transactions might result in transaction aborts.
+ */
public void setMessageStoreDurability(SyncPolicy localTransactionSynchronizationPolicy, SyncPolicy remoteTransactionSynchronizationPolicy, ReplicaAckPolicy replicaAcknowledgmentPolicy)
{
- if (_messageStoreDurability.compareAndSet(null, new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
+ if (_messageStoreDurability == null || localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync()
+ || remoteTransactionSynchronizationPolicy != _messageStoreDurability.getReplicaSync()
+ || replicaAcknowledgmentPolicy != _messageStoreDurability.getReplicaAck())
{
+ _messageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
+
+ if (_coalescingCommiter != null)
+ {
+ _coalescingCommiter.stop();
+ _coalescingCommiter = null;
+ }
+
if (localTransactionSynchronizationPolicy == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
{
localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
@@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade
}
_realMessageStoreDurability = new Durability(localTransactionSynchronizationPolicy, remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
}
- else
- {
- throw new IllegalStateException("Message store durability is already set to " + _messageStoreDurability.get());
- }
}
public void setPermittedNodes(Collection<String> permittedNodes)
Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Mon Jul 21 15:42:32 2014
@@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacade
new Durability(Durability.SyncPolicy.NO_SYNC, Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
master.getRealMessageStoreDurability());
assertEquals("Unexpected durability", TEST_DURABILITY, master.getMessageStoreDurability());
- assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+ assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
- try
- {
- master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
- fail("Cannot set message store durability twice");
- }
- catch(IllegalStateException e)
- {
- // pass
- }
+ master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
+ assertEquals("Unexpected message store durability",
+ new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
+ master.getRealMessageStoreDurability());
+ assertFalse("Coalescing sync committer is still running", master.isCoalescingSync());
}
public void testGetNodeState() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org