You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/22 13:29:07 UTC
svn commit: r1560333 - in
/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore:
jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/
src/main/java/org/apache/qpid/server/store/ber...
Author: kwall
Date: Wed Jan 22 12:29:07 2014
New Revision: 1560333
URL: http://svn.apache.org/r1560333
Log:
QPID-5409: Refactoring to move commit thread back to BDBMessageStore.
Added:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
- copied, changed from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
- copied, changed from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
Removed:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
Modified:
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Wed Jan 22 12:29:07 2014
@@ -43,12 +43,6 @@ import org.apache.qpid.server.store.berk
/**
* Management mbean for BDB HA.
- * <p>
- * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is
- * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the
- * same classloader as broker-plugins-management-jmx. See the <b>Fragment-Host:</b> header within the MANIFEST.MF
- * of this bundle.
- * </p>
*/
public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
{
@@ -80,19 +74,21 @@ public class BDBHAMessageStoreManagerMBe
}
private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+ private final String _objectName;
- protected BDBHAMessageStoreManagerMBean(ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
+ protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
{
super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
- LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
+ LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
_replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ _objectName = ObjectName.quote(virtualHostName);
register();
}
@Override
public String getObjectInstanceName()
{
- return ObjectName.quote(_replicatedEnvironmentFacade.getName());
+ return _objectName;
}
@Override
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java Wed Jan 22 12:29:07 2014
@@ -65,7 +65,7 @@ public class BDBHAMessageStoreManagerMBe
}
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
- return new BDBHAMessageStoreManagerMBean(replicatedEnvironmentFacade, (ManagedObject) parent);
+ return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent);
}
@Override
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Wed Jan 22 12:29:07 2014
@@ -71,7 +71,7 @@ public class BDBHAMessageStoreManagerMBe
_replicatedEnvironmentFacadee = mock(ReplicatedEnvironmentFacade.class);
_mBeanParent = mock(AMQManagedObject.class);
when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
- _mBean = new BDBHAMessageStoreManagerMBean(_replicatedEnvironmentFacadee, _mBeanParent);
+ _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacadee, _mBeanParent);
}
@Override
@@ -83,8 +83,6 @@ public class BDBHAMessageStoreManagerMBe
public void testObjectName() throws Exception
{
- when(_replicatedEnvironmentFacadee.getName()).thenReturn(TEST_STORE_NAME);
-
String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
assertEquals(expectedObjectName, _mBean.getObjectName().toString());
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Wed Jan 22 12:29:07 2014
@@ -110,6 +110,8 @@ public class BDBMessageStore implements
private final EnvironmentFacadeFactory _environmentFacadeFactory;
+ private volatile Committer _committer;
+
public BDBMessageStore()
{
this(TYPE, new StandardEnvironmentFacadeFactory());
@@ -151,6 +153,7 @@ public class BDBMessageStore implements
_tlogRecoveryHandler = tlogRecoveryHandler;
_virtualHost = virtualHost;
+
completeInitialisation();
}
@@ -263,7 +266,10 @@ public class BDBMessageStore implements
_storeLocation = storeLocation;
LOGGER.info("Setting up environment");
- _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(name, storeLocation, virtualHost);
+ _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeLocation, virtualHost);
+
+ _committer = _environmentFacade.createCommitter(null);
+ _committer.start();
}
@Override
@@ -287,16 +293,23 @@ public class BDBMessageStore implements
{
if (_closed.compareAndSet(false, true))
{
- _stateManager.attainState(State.CLOSING);
- try
- {
- closeEnvironment();
- }
- catch(DatabaseException e)
- {
- throw new AMQStoreException("Exception occured on message store close", e);
- }
- _stateManager.attainState(State.CLOSED);
+ _stateManager.attainState(State.CLOSING);
+ try
+ {
+ try
+ {
+ _committer.stop();
+ }
+ finally
+ {
+ closeEnvironment();
+ }
+ }
+ catch(DatabaseException e)
+ {
+ throw new AMQStoreException("Exception occured on message store close", e);
+ }
+ _stateManager.attainState(State.CLOSED);
}
}
@@ -598,7 +611,9 @@ public class BDBMessageStore implements
LOGGER.debug("Deleted content for message " + messageId);
}
- _environmentFacade.commit(tx, sync);
+ _environmentFacade.commit(tx);
+ _committer.commit(tx, sync);
+
complete = true;
tx = null;
}
@@ -980,7 +995,8 @@ public class BDBMessageStore implements
throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
}
- StoreFuture result = _environmentFacade.commit(tx, syncCommit);
+ _environmentFacade.commit(tx);
+ StoreFuture result = _committer.commit(tx, syncCommit);
if (LOGGER.isDebugEnabled())
{
@@ -1485,7 +1501,9 @@ public class BDBMessageStore implements
throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
}
store(txn);
- _environmentFacade.commit(txn,true);
+ _environmentFacade.commit(txn);
+ _committer.commit(txn, true);
+
storedSizeChangeOccured(getMetaData().getContentSize());
}
catch (AMQStoreException e)
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java&r1=1560101&r2=1560333&rev=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Wed Jan 22 12:29:07 2014
@@ -27,36 +27,40 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreFuture;
-import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
-public class CommitThreadWrapper
+public class CoalescingCommiter implements Committer
{
private final CommitThread _commitThread;
-
- public CommitThreadWrapper(String name, Environment env)
- {
- _commitThread = new CommitThread(name, env);
- }
- public void startCommitThread()
+ public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
{
- _commitThread.start();
+ _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
}
- public void stopCommitThread(RuntimeException e) throws InterruptedException
+ @Override
+ public void start()
{
- _commitThread.close(e);
- _commitThread.join();
+ _commitThread.start();
}
- public void stopCommitThread() throws InterruptedException
+ @Override
+ public void stop()
{
- stopCommitThread(new RuntimeException("Stopping commit thread"));
+ _commitThread.close();
+ try
+ {
+ _commitThread.join();
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Commit thread has not shutdown", ie);
+ }
}
+ @Override
public StoreFuture commit(Transaction tx, boolean syncCommit)
{
BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
@@ -70,9 +74,9 @@ public class CommitThreadWrapper
private final CommitThread _commitThread;
private final Transaction _tx;
+ private final boolean _syncCommit;
private RuntimeException _databaseException;
private boolean _complete;
- private boolean _syncCommit;
public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
@@ -170,15 +174,13 @@ public class CommitThreadWrapper
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
- private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
- private Environment _environment;
+ private final EnvironmentFacade _environmentFacade;
- public CommitThread(String name, Environment env)
+ public CommitThread(String name, EnvironmentFacade environmentFacade)
{
super(name);
- _config.setForce(true);
- _environment = env;
+ _environmentFacade = environmentFacade;
}
public void explicitNotify()
@@ -199,7 +201,7 @@ public class CommitThreadWrapper
{
try
{
- // RHM-7 Periodically wake up and check, just in case we
+ // Periodically wake up and check, just in case we
// missed a notification. Don't want to lock the broker hard.
_lock.wait(1000);
}
@@ -224,7 +226,7 @@ public class CommitThreadWrapper
startTime = System.currentTimeMillis();
}
- _environment.flushLog(true);
+ _environmentFacade.getEnvironment().flushLog(true);
if(LOGGER.isDebugEnabled())
{
@@ -257,7 +259,7 @@ public class CommitThreadWrapper
try
{
- _environment.close();
+ _environmentFacade.close();
}
catch (DatabaseException ex)
{
@@ -288,8 +290,9 @@ public class CommitThreadWrapper
}
}
- public void close(RuntimeException e)
+ public void close()
{
+ RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
synchronized (_lock)
{
_stopped.set(true);
Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java&r1=1560101&r2=1560333&rev=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java Wed Jan 22 12:29:07 2014
@@ -20,11 +20,36 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoreFuture;
-public interface EnvironmentFacadeFactory
+import com.sleepycat.je.Transaction;
+
+public interface Committer
{
+ void start();
+
+ StoreFuture commit(Transaction tx, boolean syncCommit);
+
+ void stop();
+
+ Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
+ {
+
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public StoreFuture commit(Transaction tx, boolean syncCommit)
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
- EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost);
+ @Override
+ public void stop()
+ {
+ }
+ };
-}
+}
\ No newline at end of file
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.StoreFuture;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -45,13 +44,16 @@ public interface EnvironmentFacade
Environment getEnvironment();
- StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException;
-
- AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
+ Committer createCommitter(String name);
void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
+ Database getOpenDatabase(String name);
+
+ void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException;
+
+ AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
+
void close();
- Database getOpenDatabase(String name);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -25,6 +25,6 @@ import org.apache.qpid.server.model.Virt
public interface EnvironmentFacadeFactory
{
- EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost);
+ EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -53,7 +53,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.replication.ReplicationGroupListener;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
@@ -145,7 +144,6 @@ public class ReplicatedEnvironmentFacade
private final String _environmentPath;
private final Map<String, String> _environmentParameters;
private final Map<String, String> _replicationEnvironmentParameters;
- private final String _name;
private final ExecutorService _restartEnvironmentExecutor;
private final ScheduledExecutorService _groupChangeExecutor;
private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
@@ -155,17 +153,14 @@ public class ReplicatedEnvironmentFacade
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
- private volatile CommitThreadWrapper _commitThreadWrapper;
private volatile ReplicatedEnvironment _environment;
private long _joinTime;
private String _lastKnownReplicationTransactionId;
@SuppressWarnings("unchecked")
- public ReplicatedEnvironmentFacade(String virtualHostName, String environmentPath,
- org.apache.qpid.server.model.ReplicationNode replicationNode,
+ public ReplicatedEnvironmentFacade(String environmentPath, org.apache.qpid.server.model.ReplicationNode replicationNode,
RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
- _name = virtualHostName;
_environmentPath = environmentPath;
_groupName = (String)replicationNode.getAttribute(GROUP_NAME);
_nodeName = replicationNode.getName();
@@ -220,28 +215,16 @@ public class ReplicatedEnvironmentFacade
throw new RuntimeException("JE environment has not been created in due time");
}
populateExistingRemoteReplicationNodes();
- _commitThreadWrapper = startCommitThread(_name, _environment);
}
@Override
- public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
+ public void commit(final Transaction tx) throws AMQStoreException
{
try
{
- // Using commit() instead of commitNoSync() for the HA store
- // to allow
- // the HA durability configuration to influence resulting
- // behaviour.
+ // Using commit() instead of commitNoSync() for the HA store to allow
+ // the HA durability configuration to influence resulting behaviour.
tx.commit();
-
- if (_coalescingSync)
- {
- return _commitThreadWrapper.commit(tx, syncCommit);
- }
- else
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
}
catch (DatabaseException de)
{
@@ -260,7 +243,6 @@ public class ReplicatedEnvironmentFacade
LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
_restartEnvironmentExecutor.shutdown();
_groupChangeExecutor.shutdown();
- stopCommitThread();
closeDatabases();
closeEnvironment();
}
@@ -398,7 +380,6 @@ public class ReplicatedEnvironmentFacade
if (state == ReplicatedEnvironment.State.MASTER)
{
reopenDatabases();
- _commitThreadWrapper = startCommitThread(_name, _environment);
StateChangeListener listener = _stateChangeListener.get();
LOGGER.debug("Application state change listener " + listener);
if (listener != null)
@@ -433,11 +414,6 @@ public class ReplicatedEnvironmentFacade
}
}
- public String getName()
- {
- return _name;
- }
-
public String getGroupName()
{
return _groupName;
@@ -670,55 +646,10 @@ public class ReplicatedEnvironmentFacade
}
}
- private CommitThreadWrapper startCommitThread(String name, Environment environment)
- {
- CommitThreadWrapper commitThreadWrapper = null;
- if (_coalescingSync)
- {
- commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
- commitThreadWrapper.startCommitThread();
- }
- return commitThreadWrapper;
- }
-
- private void stopCommitThread(RuntimeException dbe)
- {
- if (_coalescingSync)
- {
- try
- {
- _commitThreadWrapper.stopCommitThread(dbe);
- }
- catch (InterruptedException e)
- {
- LOGGER.warn("Stopping of commit thread is interrupted", e);
- Thread.interrupted();
- }
- }
- }
-
- private void stopCommitThread()
- {
- if (_coalescingSync)
- {
- try
- {
- _commitThreadWrapper.stopCommitThread();
- }
- catch (InterruptedException e)
- {
- LOGGER.warn("Stopping of commit thread is interrupted", e);
- Thread.interrupted();
- }
- }
- }
-
private void restartEnvironment(DatabaseException dbe) throws AMQStoreException
{
LOGGER.info("Restarting environment");
- stopCommitThread(dbe);
-
closeEnvironmentSafely();
_environment = createEnvironment();
@@ -865,6 +796,19 @@ public class ReplicatedEnvironmentFacade
return environment;
}
+ @Override
+ public Committer createCommitter(String name)
+ {
+ if (_coalescingSync)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+ else
+ {
+ return Committer.IMMEDIATE_FUTURE_COMMITTER;
+ }
+ }
+
private final class GroupChangeLearner implements Runnable
{
@Override
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -38,7 +38,7 @@ public class ReplicatedEnvironmentFacade
{
@Override
- public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
+ public EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost)
{
Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
if (replicationNodes == null || replicationNodes.size() != 1)
@@ -59,7 +59,7 @@ public class ReplicatedEnvironmentFacade
+ "! Please set highAvailability.coalescingSync to false in store configuration.");
}
- ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(name, storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
+ ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade);
return facade;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.StoreFuture;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -42,10 +41,9 @@ public class StandardEnvironmentFacade i
public static final String TYPE = "BDB";
private Environment _environment;
- private CommitThreadWrapper _commitThreadWrapper;
private final Map<String, Database> _databases = new HashMap<String, Database>();
- public StandardEnvironmentFacade(String name, String storePath, Map<String, String> attributes)
+ public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
{
LOGGER.info("BDB message store using environment path " + storePath);
@@ -79,13 +77,11 @@ public class StandardEnvironmentFacade i
throw de;
}
}
- _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, _environment);
- _commitThreadWrapper.startCommitThread();
}
@Override
- public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
+ public void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException
{
try
{
@@ -99,14 +95,11 @@ public class StandardEnvironmentFacade i
throw handleDatabaseException("Got DatabaseException on commit", de);
}
-
- return _commitThreadWrapper.commit(tx, syncCommit);
}
@Override
public void close()
{
- stopCommitThread();
closeDatabases();
closeEnvironment();
}
@@ -136,7 +129,6 @@ public class StandardEnvironmentFacade i
private void closeEnvironmentSafely()
{
- stopCommitThread();
if (_environment != null)
{
if (_environment.isValid())
@@ -194,26 +186,6 @@ public class StandardEnvironmentFacade i
}
}
- private void stopCommitThread()
- {
- if (_commitThreadWrapper != null)
- {
- try
- {
- _commitThreadWrapper.stopCommitThread();
- }
- catch (InterruptedException e)
- {
- LOGGER.warn("Stopping of commit thread is interrupted", e);
- Thread.interrupted();
- }
- finally
- {
- _commitThreadWrapper = null;
- }
- }
- }
-
@Override
public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
{
@@ -254,4 +226,10 @@ public class StandardEnvironmentFacade i
return database;
}
+ @Override
+ public Committer createCommitter(String name)
+ {
+ return new CoalescingCommiter(name, this);
+ }
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -30,7 +30,7 @@ public class StandardEnvironmentFacadeFa
@SuppressWarnings("unchecked")
@Override
- public EnvironmentFacade createEnvironmentFacade(String name, String storePath, VirtualHost virtualHost)
+ public EnvironmentFacade createEnvironmentFacade(String storePath, VirtualHost virtualHost)
{
Map<String, String> envConfigMap = new HashMap<String, String>();
envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
@@ -40,7 +40,7 @@ public class StandardEnvironmentFacadeFa
{
envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr);
}
- return new StandardEnvironmentFacade(name, storePath, envConfigMap);
+ return new StandardEnvironmentFacade(storePath, envConfigMap);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Wed Jan 22 12:29:07 2014
@@ -166,11 +166,6 @@ public class ReplicatedEnvironmentFacade
}
}
- public void testGetName() throws Exception
- {
- assertEquals("Unexpected name", getName(), ((ReplicatedEnvironmentFacade) createMaster()).getName());
- }
-
public void testGetGroupName() throws Exception
{
assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName());
@@ -562,7 +557,7 @@ public class ReplicatedEnvironmentFacade
{
final String nodePath = createNodeWorkingFolder(nodeName);
ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(nodePath, node, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Wed Jan 22 12:29:07 2014
@@ -123,7 +123,7 @@ public class StandardEnvironmentFacadeTe
EnvironmentFacade createEnvironmentFacade()
{
- return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+ return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org