You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/22 13:29:07 UTC

svn commit: r1560333 - in /qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore: jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/ jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/ src/main/java/org/apache/qpid/server/store/ber...

Author: kwall
Date: Wed Jan 22 12:29:07 2014
New Revision: 1560333

URL: http://svn.apache.org/r1560333
Log:
QPID-5409: Refactoring to move commit thread back to BDBMessageStore.

Added:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
      - copied, changed from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
      - copied, changed from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
Removed:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java Wed Jan 22 12:29:07 2014
@@ -43,12 +43,6 @@ import org.apache.qpid.server.store.berk
 
 /**
  * Management mbean for BDB HA.
- * <p>
- * At runtime, the classloader loading this clas must have visibility of the other Qpid JMX classes. This is
- * currently arranged through OSGI using the <b>fragment</b> feature so that this bundle shares the
- * same classloader as broker-plugins-management-jmx.  See the <b>Fragment-Host:</b> header within the MANIFEST.MF
- * of this bundle.
- * </p>
  */
 public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements ManagedBDBHAMessageStore
 {
@@ -80,19 +74,21 @@ public class BDBHAMessageStoreManagerMBe
     }
 
     private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
+    private final String _objectName;
 
-    protected BDBHAMessageStoreManagerMBean(ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
+    protected BDBHAMessageStoreManagerMBean(String virtualHostName, ReplicatedEnvironmentFacade replicatedEnvironmentFacade, ManagedObject parent) throws JMException
     {
         super(ManagedBDBHAMessageStore.class, ManagedBDBHAMessageStore.TYPE, ((AMQManagedObject)parent).getRegistry());
-        LOGGER.debug("Creating BDBHAMessageStoreManagerMBean");
+        LOGGER.debug("Creating BDBHAMessageStoreManagerMBean for " + virtualHostName);
         _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+        _objectName = ObjectName.quote(virtualHostName);
         register();
     }
 
     @Override
     public String getObjectInstanceName()
     {
-        return ObjectName.quote(_replicatedEnvironmentFacade.getName());
+        return _objectName;
     }
 
     @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanProvider.java Wed Jan 22 12:29:07 2014
@@ -65,7 +65,7 @@ public class BDBHAMessageStoreManagerMBe
         }
 
         ReplicatedEnvironmentFacade replicatedEnvironmentFacade = (ReplicatedEnvironmentFacade)messageStore.getEnvironmentFacade();
-        return new BDBHAMessageStoreManagerMBean(replicatedEnvironmentFacade, (ManagedObject) parent);
+        return new BDBHAMessageStoreManagerMBean(virtualHostChild.getName(), replicatedEnvironmentFacade, (ManagedObject) parent);
     }
 
     @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java Wed Jan 22 12:29:07 2014
@@ -71,7 +71,7 @@ public class BDBHAMessageStoreManagerMBe
         _replicatedEnvironmentFacadee = mock(ReplicatedEnvironmentFacade.class);
         _mBeanParent = mock(AMQManagedObject.class);
         when(_mBeanParent.getRegistry()).thenReturn(mock(ManagedObjectRegistry.class));
-        _mBean = new BDBHAMessageStoreManagerMBean(_replicatedEnvironmentFacadee, _mBeanParent);
+        _mBean = new BDBHAMessageStoreManagerMBean(TEST_STORE_NAME, _replicatedEnvironmentFacadee, _mBeanParent);
     }
 
     @Override
@@ -83,8 +83,6 @@ public class BDBHAMessageStoreManagerMBe
 
     public void testObjectName() throws Exception
     {
-        when(_replicatedEnvironmentFacadee.getName()).thenReturn(TEST_STORE_NAME);
-
         String expectedObjectName = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(TEST_STORE_NAME);
         assertEquals(expectedObjectName, _mBean.getObjectName().toString());
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Wed Jan 22 12:29:07 2014
@@ -110,6 +110,8 @@ public class BDBMessageStore implements 
 
     private final EnvironmentFacadeFactory _environmentFacadeFactory;
 
+    private volatile Committer _committer;
+
     public BDBMessageStore()
     {
         this(TYPE, new StandardEnvironmentFacadeFactory());
@@ -151,6 +153,7 @@ public class BDBMessageStore implements 
         _tlogRecoveryHandler = tlogRecoveryHandler;
         _virtualHost = virtualHost;
 
+
         completeInitialisation();
     }
 
@@ -263,7 +266,10 @@ public class BDBMessageStore implements 
         _storeLocation = storeLocation;
 
         LOGGER.info("Setting up environment");
-        _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(name, storeLocation, virtualHost);
+        _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeLocation, virtualHost);
+
+        _committer = _environmentFacade.createCommitter(null);
+        _committer.start();
     }
 
     @Override
@@ -287,16 +293,23 @@ public class BDBMessageStore implements 
     {
         if (_closed.compareAndSet(false, true))
         {
-	    _stateManager.attainState(State.CLOSING);
-	    try
-	    {
-		closeEnvironment();
-	    }
-	    catch(DatabaseException e)
-	    {
-		throw new AMQStoreException("Exception occured on message store close", e);
-	    }
-	    _stateManager.attainState(State.CLOSED);
+            _stateManager.attainState(State.CLOSING);
+            try
+            {
+                try
+                {
+                    _committer.stop();
+                }
+                finally
+                {
+                    closeEnvironment();
+                }
+            }
+            catch(DatabaseException e)
+            {
+                throw new AMQStoreException("Exception occured on message store close", e);
+            }
+            _stateManager.attainState(State.CLOSED);
         }
     }
 
@@ -598,7 +611,9 @@ public class BDBMessageStore implements 
                         LOGGER.debug("Deleted content for message " + messageId);
                     }
 
-                    _environmentFacade.commit(tx, sync);
+                    _environmentFacade.commit(tx);
+                    _committer.commit(tx, sync);
+
                     complete = true;
                     tx = null;
                 }
@@ -980,7 +995,8 @@ public class BDBMessageStore implements 
             throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
         }
 
-        StoreFuture result = _environmentFacade.commit(tx, syncCommit);
+        _environmentFacade.commit(tx);
+        StoreFuture result =  _committer.commit(tx, syncCommit);
 
         if (LOGGER.isDebugEnabled())
         {
@@ -1485,7 +1501,9 @@ public class BDBMessageStore implements 
                         throw _environmentFacade.handleDatabaseException("failed to begin transaction", e);
                     }
                     store(txn);
-                    _environmentFacade.commit(txn,true);
+                    _environmentFacade.commit(txn);
+                    _committer.commit(txn, true);
+
                     storedSizeChangeOccured(getMetaData().getContentSize());
                 }
                 catch (AMQStoreException e)

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java&r1=1560101&r2=1560333&rev=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Wed Jan 22 12:29:07 2014
@@ -27,36 +27,40 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.StoreFuture;
 
-import com.sleepycat.je.CheckpointConfig;
 import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
 
-public class CommitThreadWrapper
+public class CoalescingCommiter implements Committer
 {
     private final CommitThread _commitThread;
-    
-    public CommitThreadWrapper(String name, Environment env)
-    {
-        _commitThread = new CommitThread(name, env);
-    }
 
-    public void startCommitThread()
+    public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
     {
-        _commitThread.start();
+        _commitThread = new CommitThread("Commit-Thread-" + name, environmentFacade);
     }
 
-    public void stopCommitThread(RuntimeException e) throws InterruptedException
+    @Override
+    public void start()
     {
-        _commitThread.close(e);
-        _commitThread.join();
+        _commitThread.start();
     }
 
-    public void stopCommitThread() throws InterruptedException
+    @Override
+    public void stop()
     {
-        stopCommitThread(new RuntimeException("Stopping commit thread"));
+        _commitThread.close();
+        try
+        {
+            _commitThread.join();
+        }
+        catch (InterruptedException ie)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Commit thread has not shutdown", ie);
+        }
     }
 
+    @Override
     public StoreFuture commit(Transaction tx, boolean syncCommit)
     {
         BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
@@ -70,9 +74,9 @@ public class CommitThreadWrapper
 
         private final CommitThread _commitThread;
         private final Transaction _tx;
+        private final boolean _syncCommit;
         private RuntimeException _databaseException;
         private boolean _complete;
-        private boolean _syncCommit;
 
         public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
         {
@@ -170,15 +174,13 @@ public class CommitThreadWrapper
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
         private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
-        private final CheckpointConfig _config = new CheckpointConfig();
         private final Object _lock = new Object();
-        private Environment _environment;
+        private final EnvironmentFacade _environmentFacade;
 
-        public CommitThread(String name, Environment env)
+        public CommitThread(String name, EnvironmentFacade environmentFacade)
         {
             super(name);
-            _config.setForce(true);
-            _environment = env;
+            _environmentFacade = environmentFacade;
         }
 
         public void explicitNotify()
@@ -199,7 +201,7 @@ public class CommitThreadWrapper
                     {
                         try
                         {
-                            // RHM-7 Periodically wake up and check, just in case we
+                            // Periodically wake up and check, just in case we
                             // missed a notification. Don't want to lock the broker hard.
                             _lock.wait(1000);
                         }
@@ -224,7 +226,7 @@ public class CommitThreadWrapper
                     startTime = System.currentTimeMillis();
                 }
 
-                _environment.flushLog(true);
+                _environmentFacade.getEnvironment().flushLog(true);
 
                 if(LOGGER.isDebugEnabled())
                 {
@@ -257,7 +259,7 @@ public class CommitThreadWrapper
 
                     try
                     {
-                        _environment.close();
+                        _environmentFacade.close();
                     }
                     catch (DatabaseException ex)
                     {
@@ -288,8 +290,9 @@ public class CommitThreadWrapper
             }
         }
 
-        public void close(RuntimeException e)
+        public void close()
         {
+            RuntimeException e = new RuntimeException("Commit thread has been closed, transaction aborted");
             synchronized (_lock)
             {
                 _stopped.set(true);

Copied: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (from r1560101, qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?p2=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java&p1=qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java&r1=1560101&r2=1560333&rev=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java Wed Jan 22 12:29:07 2014
@@ -20,11 +20,36 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoreFuture;
 
-public interface EnvironmentFacadeFactory
+import com.sleepycat.je.Transaction;
+
+public interface Committer
 {
+    void start();
+
+    StoreFuture commit(Transaction tx, boolean syncCommit);
+
+    void stop();
+
+    Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
+    {
+
+        @Override
+        public void start()
+        {
+        }
+
+        @Override
+        public StoreFuture commit(Transaction tx, boolean syncCommit)
+        {
+            return StoreFuture.IMMEDIATE_FUTURE;
+        }
 
-    EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost);
+        @Override
+        public void stop()
+        {
+        }
+    };
 
-}
+}
\ No newline at end of file

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.StoreFuture;
 
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -45,13 +44,16 @@ public interface EnvironmentFacade
 
     Environment getEnvironment();
 
-    StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException;
-
-    AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
+    Committer createCommitter(String name);
 
     void openDatabases(DatabaseConfig dbConfig, String... databaseNames);
 
+    Database getOpenDatabase(String name);
+
+    void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException;
+
+    AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e);
+
     void close();
 
-    Database getOpenDatabase(String name);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -25,6 +25,6 @@ import org.apache.qpid.server.model.Virt
 public interface EnvironmentFacadeFactory
 {
 
-    EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost);
+    EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost);
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -53,7 +53,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.replication.ReplicationGroupListener;
-import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.berkeleydb.replication.DatabasePinger;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
 import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
@@ -145,7 +144,6 @@ public class ReplicatedEnvironmentFacade
     private final String _environmentPath;
     private final Map<String, String> _environmentParameters;
     private final Map<String, String> _replicationEnvironmentParameters;
-    private final String _name;
     private final ExecutorService _restartEnvironmentExecutor;
     private final ScheduledExecutorService _groupChangeExecutor;
     private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIAL);
@@ -155,17 +153,14 @@ public class ReplicatedEnvironmentFacade
 
     private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
-    private volatile CommitThreadWrapper _commitThreadWrapper;
     private volatile ReplicatedEnvironment _environment;
     private long _joinTime;
     private String _lastKnownReplicationTransactionId;
 
     @SuppressWarnings("unchecked")
-    public ReplicatedEnvironmentFacade(String virtualHostName, String environmentPath,
-            org.apache.qpid.server.model.ReplicationNode replicationNode,
+    public ReplicatedEnvironmentFacade(String environmentPath, org.apache.qpid.server.model.ReplicationNode replicationNode,
             RemoteReplicationNodeFactory remoteReplicationNodeFactory)
     {
-         _name = virtualHostName;
         _environmentPath = environmentPath;
         _groupName = (String)replicationNode.getAttribute(GROUP_NAME);
         _nodeName = replicationNode.getName();
@@ -220,28 +215,16 @@ public class ReplicatedEnvironmentFacade
             throw new RuntimeException("JE environment has not been created in due time");
         }
         populateExistingRemoteReplicationNodes();
-        _commitThreadWrapper = startCommitThread(_name, _environment);
     }
 
     @Override
-    public StoreFuture commit(final Transaction tx, final boolean syncCommit) throws AMQStoreException
+    public void commit(final Transaction tx) throws AMQStoreException
     {
         try
         {
-            // Using commit() instead of commitNoSync() for the HA store
-            // to allow
-            // the HA durability configuration to influence resulting
-            // behaviour.
+            // Using commit() instead of commitNoSync() for the HA store to allow
+            // the HA durability configuration to influence resulting behaviour.
             tx.commit();
-
-            if (_coalescingSync)
-            {
-                return _commitThreadWrapper.commit(tx, syncCommit);
-            }
-            else
-            {
-                return StoreFuture.IMMEDIATE_FUTURE;
-            }
         }
         catch (DatabaseException de)
         {
@@ -260,7 +243,6 @@ public class ReplicatedEnvironmentFacade
                 LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
                 _restartEnvironmentExecutor.shutdown();
                 _groupChangeExecutor.shutdown();
-                stopCommitThread();
                 closeDatabases();
                 closeEnvironment();
             }
@@ -398,7 +380,6 @@ public class ReplicatedEnvironmentFacade
         if (state == ReplicatedEnvironment.State.MASTER)
         {
             reopenDatabases();
-            _commitThreadWrapper = startCommitThread(_name, _environment);
             StateChangeListener listener = _stateChangeListener.get();
             LOGGER.debug("Application state change listener " + listener);
             if (listener != null)
@@ -433,11 +414,6 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public String getName()
-    {
-        return _name;
-    }
-
     public String getGroupName()
     {
         return _groupName;
@@ -670,55 +646,10 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private CommitThreadWrapper startCommitThread(String name, Environment environment)
-    {
-        CommitThreadWrapper commitThreadWrapper = null;
-        if (_coalescingSync)
-        {
-            commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, environment);
-            commitThreadWrapper.startCommitThread();
-        }
-        return commitThreadWrapper;
-    }
-
-    private void stopCommitThread(RuntimeException dbe)
-    {
-        if (_coalescingSync)
-        {
-            try
-            {
-                _commitThreadWrapper.stopCommitThread(dbe);
-            }
-            catch (InterruptedException e)
-            {
-                LOGGER.warn("Stopping of commit thread is interrupted", e);
-                Thread.interrupted();
-            }
-        }
-    }
-
-    private void stopCommitThread()
-    {
-        if (_coalescingSync)
-        {
-            try
-            {
-                _commitThreadWrapper.stopCommitThread();
-            }
-            catch (InterruptedException e)
-            {
-                LOGGER.warn("Stopping of commit thread is interrupted", e);
-                Thread.interrupted();
-            }
-        }
-    }
-
     private void restartEnvironment(DatabaseException dbe) throws AMQStoreException
     {
         LOGGER.info("Restarting environment");
 
-        stopCommitThread(dbe);
-
         closeEnvironmentSafely();
 
         _environment = createEnvironment();
@@ -865,6 +796,19 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
+    @Override
+    public Committer createCommitter(String name)
+    {
+        if (_coalescingSync)
+        {
+            return new CoalescingCommiter(name, this);
+        }
+        else
+        {
+            return Committer.IMMEDIATE_FUTURE_COMMITTER;
+        }
+    }
+
     private final class GroupChangeLearner implements Runnable
     {
         @Override

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -38,7 +38,7 @@ public class ReplicatedEnvironmentFacade
 {
 
     @Override
-    public EnvironmentFacade createEnvironmentFacade(String name, String storeLocation, VirtualHost virtualHost)
+    public EnvironmentFacade createEnvironmentFacade(String storeLocation, VirtualHost virtualHost)
     {
         Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
         if (replicationNodes == null || replicationNodes.size() != 1)
@@ -59,7 +59,7 @@ public class ReplicatedEnvironmentFacade
                     + "! Please set highAvailability.coalescingSync to false in store configuration.");
         }
 
-        ReplicatedEnvironmentFacade facade =  new ReplicatedEnvironmentFacade(name, storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
+        ReplicatedEnvironmentFacade facade =  new ReplicatedEnvironmentFacade(storeLocation, localNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
         ((LocalReplicationNode)localNode).setReplicatedEnvironmentFacade(facade);
         return facade;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Wed Jan 22 12:29:07 2014
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.store.StoreFuture;
 
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -42,10 +41,9 @@ public class StandardEnvironmentFacade i
     public static final String TYPE = "BDB";
 
     private Environment _environment;
-    private CommitThreadWrapper _commitThreadWrapper;
     private final Map<String, Database> _databases = new HashMap<String, Database>();
 
-    public StandardEnvironmentFacade(String name, String storePath, Map<String, String> attributes)
+    public StandardEnvironmentFacade(String storePath, Map<String, String> attributes)
     {
 
         LOGGER.info("BDB message store using environment path " + storePath);
@@ -79,13 +77,11 @@ public class StandardEnvironmentFacade i
                 throw de;
             }
         }
-        _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, _environment);
-        _commitThreadWrapper.startCommitThread();
     }
 
 
     @Override
-    public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
+    public void commit(com.sleepycat.je.Transaction tx) throws AMQStoreException
     {
         try
         {
@@ -99,14 +95,11 @@ public class StandardEnvironmentFacade i
 
             throw handleDatabaseException("Got DatabaseException on commit", de);
         }
-
-        return _commitThreadWrapper.commit(tx, syncCommit);
     }
 
     @Override
     public void close()
     {
-        stopCommitThread();
         closeDatabases();
         closeEnvironment();
     }
@@ -136,7 +129,6 @@ public class StandardEnvironmentFacade i
 
     private void closeEnvironmentSafely()
     {
-        stopCommitThread();
         if (_environment != null)
         {
             if (_environment.isValid())
@@ -194,26 +186,6 @@ public class StandardEnvironmentFacade i
         }
     }
 
-    private void stopCommitThread()
-    {
-        if (_commitThreadWrapper != null)
-        {
-            try
-            {
-                _commitThreadWrapper.stopCommitThread();
-            }
-            catch (InterruptedException e)
-            {
-                LOGGER.warn("Stopping of commit thread is interrupted", e);
-                Thread.interrupted();
-            }
-            finally
-            {
-                _commitThreadWrapper = null;
-            }
-        }
-    }
-
     @Override
     public AMQStoreException handleDatabaseException(String contextMessage, DatabaseException e)
     {
@@ -254,4 +226,10 @@ public class StandardEnvironmentFacade i
         return database;
     }
 
+    @Override
+    public Committer createCommitter(String name)
+    {
+        return new CoalescingCommiter(name, this);
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java Wed Jan 22 12:29:07 2014
@@ -30,7 +30,7 @@ public class StandardEnvironmentFacadeFa
 
     @SuppressWarnings("unchecked")
     @Override
-    public EnvironmentFacade createEnvironmentFacade(String name, String storePath, VirtualHost virtualHost)
+    public EnvironmentFacade createEnvironmentFacade(String storePath, VirtualHost virtualHost)
     {
         Map<String, String> envConfigMap = new HashMap<String, String>();
         envConfigMap.putAll(EnvironmentFacade.ENVCONFIG_DEFAULTS);
@@ -40,7 +40,7 @@ public class StandardEnvironmentFacadeFa
         {
             envConfigMap.putAll((Map<String, String>) bdbEnvConfigAttr);
         }
-        return new StandardEnvironmentFacade(name, storePath, envConfigMap);
+        return new StandardEnvironmentFacade(storePath, envConfigMap);
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ReplicatedEnvironmentFacadeTest.java Wed Jan 22 12:29:07 2014
@@ -166,11 +166,6 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public void testGetName() throws Exception
-    {
-        assertEquals("Unexpected name", getName(), ((ReplicatedEnvironmentFacade) createMaster()).getName());
-    }
-
     public void testGetGroupName() throws Exception
     {
         assertEquals("Unexpected group name", TEST_GROUP_NAME, ((ReplicatedEnvironmentFacade) createMaster()).getGroupName());
@@ -562,7 +557,7 @@ public class ReplicatedEnvironmentFacade
     {
         final String nodePath = createNodeWorkingFolder(nodeName);
         ReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
-        ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(getName(), nodePath, node, _remoteReplicationNodeFactory);
+        ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(nodePath, node, _remoteReplicationNodeFactory);
         ref.setReplicationGroupListener(replicationGroupListener);
         ref.setStateChangeListener(stateChangeListener);
         _nodes.put(nodeName, ref);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java?rev=1560333&r1=1560332&r2=1560333&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeTest.java Wed Jan 22 12:29:07 2014
@@ -123,7 +123,7 @@ public class StandardEnvironmentFacadeTe
 
     EnvironmentFacade createEnvironmentFacade()
     {
-        return new StandardEnvironmentFacade(getName(), _storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
+        return new StandardEnvironmentFacade(_storePath.getAbsolutePath(), Collections.<String, String>emptyMap());
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org