You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/16 09:40:33 UTC

svn commit: r955149 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/broker/ft/

Author: rajdavies
Date: Wed Jun 16 07:40:33 2010
New Revision: 955149

URL: http://svn.apache.org/viewvc?rev=955149&view=rev
Log:
make concurrent store and dispatch with transactions optional

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Jun 16 07:40:33 2010
@@ -31,6 +31,7 @@ import org.apache.activemq.store.Persist
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.usage.SystemUsage;
+
 /**
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -41,7 +42,6 @@ import org.apache.activemq.usage.SystemU
  */
 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
     private final KahaDBStore letter = new KahaDBStore();
-    
 
     /**
      * @param context
@@ -157,8 +157,6 @@ public class KahaDBPersistenceAdapter im
         this.letter.setBrokerName(brokerName);
     }
 
-    
-
     /**
      * @param usageManager
      * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
@@ -193,6 +191,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the journalMaxFileLength
+     * 
      * @return the journalMaxFileLength
      */
     public int getJournalMaxFileLength() {
@@ -200,8 +199,11 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
-     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
+     * be used
+     * 
+     * @org.apache.xbean.Property 
+     *                            propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setJournalMaxFileLength(int journalMaxFileLength) {
         this.letter.setJournalMaxFileLength(journalMaxFileLength);
@@ -209,6 +211,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the checkpointInterval
+     * 
      * @return the checkpointInterval
      */
     public long getCheckpointInterval() {
@@ -217,7 +220,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the checkpointInterval
-     * @param checkpointInterval the checkpointInterval to set
+     * 
+     * @param checkpointInterval
+     *            the checkpointInterval to set
      */
     public void setCheckpointInterval(long checkpointInterval) {
         this.letter.setCheckpointInterval(checkpointInterval);
@@ -225,6 +230,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the cleanupInterval
+     * 
      * @return the cleanupInterval
      */
     public long getCleanupInterval() {
@@ -233,7 +239,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the cleanupInterval
-     * @param cleanupInterval the cleanupInterval to set
+     * 
+     * @param cleanupInterval
+     *            the cleanupInterval to set
      */
     public void setCleanupInterval(long cleanupInterval) {
         this.letter.setCleanupInterval(cleanupInterval);
@@ -241,6 +249,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the indexWriteBatchSize
+     * 
      * @return the indexWriteBatchSize
      */
     public int getIndexWriteBatchSize() {
@@ -249,7 +258,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the indexWriteBatchSize
-     * @param indexWriteBatchSize the indexWriteBatchSize to set
+     * 
+     * @param indexWriteBatchSize
+     *            the indexWriteBatchSize to set
      */
     public void setIndexWriteBatchSize(int indexWriteBatchSize) {
         this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@@ -257,6 +268,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the journalMaxWriteBatchSize
+     * 
      * @return the journalMaxWriteBatchSize
      */
     public int getJournalMaxWriteBatchSize() {
@@ -265,14 +277,17 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the journalMaxWriteBatchSize
-     * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
+     * 
+     * @param journalMaxWriteBatchSize
+     *            the journalMaxWriteBatchSize to set
      */
     public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
         this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
     }
-    
+
     /**
      * Get the enableIndexWriteAsync
+     * 
      * @return the enableIndexWriteAsync
      */
     public boolean isEnableIndexWriteAsync() {
@@ -281,7 +296,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the enableIndexWriteAsync
-     * @param enableIndexWriteAsync the enableIndexWriteAsync to set
+     * 
+     * @param enableIndexWriteAsync
+     *            the enableIndexWriteAsync to set
      */
     public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
         this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@@ -289,12 +306,13 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the directory
+     * 
      * @return the directory
      */
     public File getDirectory() {
         return this.letter.getDirectory();
     }
-    
+
     /**
      * @param dir
      * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
@@ -305,6 +323,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the enableJournalDiskSyncs
+     * 
      * @return the enableJournalDiskSyncs
      */
     public boolean isEnableJournalDiskSyncs() {
@@ -313,14 +332,17 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the enableJournalDiskSyncs
-     * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
+     * 
+     * @param enableJournalDiskSyncs
+     *            the enableJournalDiskSyncs to set
      */
     public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
         this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
     }
-    
+
     /**
      * Get the indexCacheSize
+     * 
      * @return the indexCacheSize
      */
     public int getIndexCacheSize() {
@@ -329,14 +351,17 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the indexCacheSize
-     * @param indexCacheSize the indexCacheSize to set
+     * 
+     * @param indexCacheSize
+     *            the indexCacheSize to set
      */
     public void setIndexCacheSize(int indexCacheSize) {
         this.letter.setIndexCacheSize(indexCacheSize);
     }
-    
+
     /**
      * Get the ignoreMissingJournalfiles
+     * 
      * @return the ignoreMissingJournalfiles
      */
     public boolean isIgnoreMissingJournalfiles() {
@@ -345,7 +370,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the ignoreMissingJournalfiles
-     * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
+     * 
+     * @param ignoreMissingJournalfiles
+     *            the ignoreMissingJournalfiles to set
      */
     public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
         this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@@ -367,9 +394,9 @@ public class KahaDBPersistenceAdapter im
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
 
-	public void setBrokerService(BrokerService brokerService) {
-		letter.setBrokerService(brokerService);
-	}
+    public void setBrokerService(BrokerService brokerService) {
+        letter.setBrokerService(brokerService);
+    }
 
     public boolean isArchiveDataLogs() {
         return letter.isArchiveDataLogs();
@@ -378,7 +405,7 @@ public class KahaDBPersistenceAdapter im
     public void setArchiveDataLogs(boolean archiveDataLogs) {
         letter.setArchiveDataLogs(archiveDataLogs);
     }
-    
+
     public File getDirectoryArchive() {
         return letter.getDirectoryArchive();
     }
@@ -386,36 +413,52 @@ public class KahaDBPersistenceAdapter im
     public void setDirectoryArchive(File directoryArchive) {
         letter.setDirectoryArchive(directoryArchive);
     }
-    
+
     public boolean isConcurrentStoreAndDispatchQueues() {
         return letter.isConcurrentStoreAndDispatchQueues();
     }
-    
+
     public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
         letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
-    }    
-    
+    }
+
     public boolean isConcurrentStoreAndDispatchTopics() {
         return letter.isConcurrentStoreAndDispatchTopics();
     }
-    
+
     public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
         letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
-    }    
-    
+    }
+
+    /**
+     * @return the concurrentStoreAndDispatchTransactions
+     */
+    public boolean isConcurrentStoreAndDispatchTransactions() {
+        return letter.isConcurrentStoreAndDispatchTransactions();
+    }
+
+    /**
+     * @param concurrentStoreAndDispatchTransactions
+     *            the concurrentStoreAndDispatchTransactions to set
+     */
+    public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
+        letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions);
+    }
+
     public int getMaxAsyncJobs() {
         return letter.getMaxAsyncJobs();
     }
     /**
-     * @param maxAsyncJobs the maxAsyncJobs to set
+     * @param maxAsyncJobs
+     *            the maxAsyncJobs to set
      */
     public void setMaxAsyncJobs(int maxAsyncJobs) {
-       letter.setMaxAsyncJobs(maxAsyncJobs);
-    }   
-    
+        letter.setMaxAsyncJobs(maxAsyncJobs);
+    }
+
     @Override
     public String toString() {
         String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
-        return "KahaDBPersistenceAdapter[" + path +"]" ;
+        return "KahaDBPersistenceAdapter[" + path + "]";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jun 16 07:40:33 2010
@@ -96,6 +96,7 @@ public class KahaDBStore extends Message
     Semaphore globalTopicSemaphore;
     private boolean concurrentStoreAndDispatchQueues = true;
     private boolean concurrentStoreAndDispatchTopics = true;
+    private boolean concurrentStoreAndDispatchTransactions = true;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
     private final KahaDBTransactionStore transactionStore;
 
@@ -143,6 +144,20 @@ public class KahaDBStore extends Message
     public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
         this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
     }
+    
+    /**
+     * @return the concurrentStoreAndDispatchTransactions
+     */
+    public boolean isConcurrentStoreAndDispatchTransactions() {
+        return this.concurrentStoreAndDispatchTransactions;
+    }
+
+    /**
+     * @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set
+     */
+    public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
+        this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
+    }
 
     /**
      * @return the maxAsyncJobs

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Wed Jun 16 07:40:33 2010
@@ -49,6 +49,8 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
@@ -57,7 +59,7 @@ import org.apache.activemq.wireformat.Wi
  * @version $Revision: 1.4 $
  */
 public class KahaDBTransactionStore implements TransactionStore {
-
+    static final Log LOG = LogFactory.getLog(KahaDBTransactionStore.class);
     ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
     private final WireFormat wireFormat = new OpenWireFormat();
     private final KahaDBStore theStore;
@@ -222,7 +224,7 @@ public class KahaDBTransactionStore impl
     public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
             throws IOException {
         if (txid != null) {
-            if (!txid.isXATransaction()) {
+            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
                 if (preCommit != null) {
                     preCommit.run();
                 }
@@ -262,7 +264,8 @@ public class KahaDBTransactionStore impl
                 KahaTransactionInfo info = getTransactionInfo(txid);
                 theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
             }
-
+        }else {
+           LOG.error("Null transaction passed on commit");
         }
     }
 
@@ -271,11 +274,11 @@ public class KahaDBTransactionStore impl
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
     public void rollback(TransactionId txid) throws IOException {
-        if (txid.isXATransaction()) {
+        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()) {
             KahaTransactionInfo info = getTransactionInfo(txid);
             theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
         } else {
-            Object result = inflightTransactions.remove(txid);
+            inflightTransactions.remove(txid);
         }
     }
 
@@ -323,7 +326,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction()) {
+            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
                 destination.addMessage(context, message);
             } else {
                 Tx tx = getTx(message.getTransactionId());
@@ -349,8 +352,9 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction()) {
-                return destination.asyncAddQueueMessage(context, message);
+            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+                destination.addMessage(context, message);
+                return AbstractMessageStore.FUTURE;
             } else {
                 Tx tx = getTx(message.getTransactionId());
                 tx.add(new AddMessageCommand(context) {
@@ -375,8 +379,9 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction()) {
-                return destination.asyncAddTopicMessage(context, message);
+            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
+                destination.addMessage(context, message);
+                return AbstractMessageStore.FUTURE;
             } else {
                 Tx tx = getTx(message.getTransactionId());
                 tx.add(new AddMessageCommand(context) {
@@ -405,7 +410,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction()) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
                 destination.removeMessage(context, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());
@@ -431,7 +436,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction()) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
                 destination.removeAsyncMessage(context, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java Wed Jun 16 07:40:33 2010
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.io.File;
 import java.net.URISyntaxException;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTopicTransactionTest;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.test.JmsResourceProvider;
 
 /**
@@ -32,13 +34,19 @@ public class TransactedTopicMasterSlaveT
     protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false";
     private boolean stopMaster = false;
 
+    @Override
     protected void setUp() throws Exception {
         failureCount = super.batchCount / 2;
         // this will create the main (or master broker)
         broker = createBroker();
+        File dir = new File ("target" + File.separator + "slave");
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+        adapter.setConcurrentStoreAndDispatchTransactions(false);
         broker.start();
         slave = new BrokerService();
         slave.setBrokerName("slave");
+        slave.setPersistenceAdapter(adapter);
         slave.setDeleteAllMessagesOnStartup(true);
         slave.setMasterConnectorURI("tcp://localhost:62001");
         slave.addConnector("tcp://localhost:62002");
@@ -53,26 +61,35 @@ public class TransactedTopicMasterSlaveT
         reconnect();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         slave.stop();
         slave = null;
         super.tearDown();
     }
 
+    @Override
     protected BrokerService createBroker() throws Exception, URISyntaxException {
+        File dir = new File ("target" + File.separator + "master");
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+        adapter.setConcurrentStoreAndDispatchTransactions(false);
         BrokerService broker = new BrokerService();
         broker.setBrokerName("master");
+        broker.setPersistenceAdapter(adapter);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.addConnector("tcp://localhost:62001");
         return broker;
     }
 
+    @Override
     protected JmsResourceProvider getJmsResourceProvider() {
         JmsResourceProvider p = super.getJmsResourceProvider();
         p.setServerUri(uriString);
         return p;
     }
 
+    @Override
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory(uriString);
     }
@@ -86,6 +103,7 @@ public class TransactedTopicMasterSlaveT
         }
     }
     
+    @Override
     protected void messageSent() throws Exception {
         if (stopMaster) {
             if (++inflightMessageCount >= failureCount) {