You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/16 09:40:33 UTC
svn commit: r955149 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/broker/ft/
Author: rajdavies
Date: Wed Jun 16 07:40:33 2010
New Revision: 955149
URL: http://svn.apache.org/viewvc?rev=955149&view=rev
Log:
make concurrent store and dispatch with transactions optional
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Jun 16 07:40:33 2010
@@ -31,6 +31,7 @@ import org.apache.activemq.store.Persist
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
+
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -41,7 +42,6 @@ import org.apache.activemq.usage.SystemU
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private final KahaDBStore letter = new KahaDBStore();
-
/**
* @param context
@@ -157,8 +157,6 @@ public class KahaDBPersistenceAdapter im
this.letter.setBrokerName(brokerName);
}
-
-
/**
* @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
@@ -193,6 +191,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxFileLength
+ *
* @return the journalMaxFileLength
*/
public int getJournalMaxFileLength() {
@@ -200,8 +199,11 @@ public class KahaDBPersistenceAdapter im
}
/**
- * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
+ * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
+ * be used
+ *
+ * @org.apache.xbean.Property
+ * propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.letter.setJournalMaxFileLength(journalMaxFileLength);
@@ -209,6 +211,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the checkpointInterval
+ *
* @return the checkpointInterval
*/
public long getCheckpointInterval() {
@@ -217,7 +220,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the checkpointInterval
- * @param checkpointInterval the checkpointInterval to set
+ *
+ * @param checkpointInterval
+ * the checkpointInterval to set
*/
public void setCheckpointInterval(long checkpointInterval) {
this.letter.setCheckpointInterval(checkpointInterval);
@@ -225,6 +230,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the cleanupInterval
+ *
* @return the cleanupInterval
*/
public long getCleanupInterval() {
@@ -233,7 +239,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the cleanupInterval
- * @param cleanupInterval the cleanupInterval to set
+ *
+ * @param cleanupInterval
+ * the cleanupInterval to set
*/
public void setCleanupInterval(long cleanupInterval) {
this.letter.setCleanupInterval(cleanupInterval);
@@ -241,6 +249,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the indexWriteBatchSize
+ *
* @return the indexWriteBatchSize
*/
public int getIndexWriteBatchSize() {
@@ -249,7 +258,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexWriteBatchSize
- * @param indexWriteBatchSize the indexWriteBatchSize to set
+ *
+ * @param indexWriteBatchSize
+ * the indexWriteBatchSize to set
*/
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@@ -257,6 +268,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxWriteBatchSize
+ *
* @return the journalMaxWriteBatchSize
*/
public int getJournalMaxWriteBatchSize() {
@@ -265,14 +277,17 @@ public class KahaDBPersistenceAdapter im
/**
* Set the journalMaxWriteBatchSize
- * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
+ *
+ * @param journalMaxWriteBatchSize
+ * the journalMaxWriteBatchSize to set
*/
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
}
-
+
/**
* Get the enableIndexWriteAsync
+ *
* @return the enableIndexWriteAsync
*/
public boolean isEnableIndexWriteAsync() {
@@ -281,7 +296,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableIndexWriteAsync
- * @param enableIndexWriteAsync the enableIndexWriteAsync to set
+ *
+ * @param enableIndexWriteAsync
+ * the enableIndexWriteAsync to set
*/
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@@ -289,12 +306,13 @@ public class KahaDBPersistenceAdapter im
/**
* Get the directory
+ *
* @return the directory
*/
public File getDirectory() {
return this.letter.getDirectory();
}
-
+
/**
* @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
@@ -305,6 +323,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the enableJournalDiskSyncs
+ *
* @return the enableJournalDiskSyncs
*/
public boolean isEnableJournalDiskSyncs() {
@@ -313,14 +332,17 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableJournalDiskSyncs
- * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
+ *
+ * @param enableJournalDiskSyncs
+ * the enableJournalDiskSyncs to set
*/
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
}
-
+
/**
* Get the indexCacheSize
+ *
* @return the indexCacheSize
*/
public int getIndexCacheSize() {
@@ -329,14 +351,17 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexCacheSize
- * @param indexCacheSize the indexCacheSize to set
+ *
+ * @param indexCacheSize
+ * the indexCacheSize to set
*/
public void setIndexCacheSize(int indexCacheSize) {
this.letter.setIndexCacheSize(indexCacheSize);
}
-
+
/**
* Get the ignoreMissingJournalfiles
+ *
* @return the ignoreMissingJournalfiles
*/
public boolean isIgnoreMissingJournalfiles() {
@@ -345,7 +370,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the ignoreMissingJournalfiles
- * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
+ *
+ * @param ignoreMissingJournalfiles
+ * the ignoreMissingJournalfiles to set
*/
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@@ -367,9 +394,9 @@ public class KahaDBPersistenceAdapter im
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
- public void setBrokerService(BrokerService brokerService) {
- letter.setBrokerService(brokerService);
- }
+ public void setBrokerService(BrokerService brokerService) {
+ letter.setBrokerService(brokerService);
+ }
public boolean isArchiveDataLogs() {
return letter.isArchiveDataLogs();
@@ -378,7 +405,7 @@ public class KahaDBPersistenceAdapter im
public void setArchiveDataLogs(boolean archiveDataLogs) {
letter.setArchiveDataLogs(archiveDataLogs);
}
-
+
public File getDirectoryArchive() {
return letter.getDirectoryArchive();
}
@@ -386,36 +413,52 @@ public class KahaDBPersistenceAdapter im
public void setDirectoryArchive(File directoryArchive) {
letter.setDirectoryArchive(directoryArchive);
}
-
+
public boolean isConcurrentStoreAndDispatchQueues() {
return letter.isConcurrentStoreAndDispatchQueues();
}
-
+
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
- }
-
+ }
+
public boolean isConcurrentStoreAndDispatchTopics() {
return letter.isConcurrentStoreAndDispatchTopics();
}
-
+
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
- }
-
+ }
+
+ /**
+ * @return the concurrentStoreAndDispatchTransactions
+ */
+ public boolean isConcurrentStoreAndDispatchTransactions() {
+ return letter.isConcurrentStoreAndDispatchTransactions();
+ }
+
+ /**
+ * @param concurrentStoreAndDispatchTransactions
+ * the concurrentStoreAndDispatchTransactions to set
+ */
+ public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
+ letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions);
+ }
+
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
/**
- * @param maxAsyncJobs the maxAsyncJobs to set
+ * @param maxAsyncJobs
+ * the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
- letter.setMaxAsyncJobs(maxAsyncJobs);
- }
-
+ letter.setMaxAsyncJobs(maxAsyncJobs);
+ }
+
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
- return "KahaDBPersistenceAdapter[" + path +"]" ;
+ return "KahaDBPersistenceAdapter[" + path + "]";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jun 16 07:40:33 2010
@@ -96,6 +96,7 @@ public class KahaDBStore extends Message
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
+ private boolean concurrentStoreAndDispatchTransactions = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
@@ -143,6 +144,20 @@ public class KahaDBStore extends Message
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
+
+ /**
+ * @return the concurrentStoreAndDispatchTransactions
+ */
+ public boolean isConcurrentStoreAndDispatchTransactions() {
+ return this.concurrentStoreAndDispatchTransactions;
+ }
+
+ /**
+ * @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set
+ */
+ public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
+ this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
+ }
/**
* @return the maxAsyncJobs
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Wed Jun 16 07:40:33 2010
@@ -49,6 +49,8 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
@@ -57,7 +59,7 @@ import org.apache.activemq.wireformat.Wi
* @version $Revision: 1.4 $
*/
public class KahaDBTransactionStore implements TransactionStore {
-
+ static final Log LOG = LogFactory.getLog(KahaDBTransactionStore.class);
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final WireFormat wireFormat = new OpenWireFormat();
private final KahaDBStore theStore;
@@ -222,7 +224,7 @@ public class KahaDBTransactionStore impl
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
- if (!txid.isXATransaction()) {
+ if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
if (preCommit != null) {
preCommit.run();
}
@@ -262,7 +264,8 @@ public class KahaDBTransactionStore impl
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
}
-
+ }else {
+ LOG.error("Null transaction passed on commit");
}
}
@@ -271,11 +274,11 @@ public class KahaDBTransactionStore impl
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
- if (txid.isXATransaction()) {
+ if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()) {
KahaTransactionInfo info = getTransactionInfo(txid);
theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
} else {
- Object result = inflightTransactions.remove(txid);
+ inflightTransactions.remove(txid);
}
}
@@ -323,7 +326,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction()) {
+ if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.addMessage(context, message);
} else {
Tx tx = getTx(message.getTransactionId());
@@ -349,8 +352,9 @@ public class KahaDBTransactionStore impl
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction()) {
- return destination.asyncAddQueueMessage(context, message);
+ if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+ destination.addMessage(context, message);
+ return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@@ -375,8 +379,9 @@ public class KahaDBTransactionStore impl
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction()) {
- return destination.asyncAddTopicMessage(context, message);
+ if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
+ destination.addMessage(context, message);
+ return AbstractMessageStore.FUTURE;
} else {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand(context) {
@@ -405,7 +410,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction()) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -431,7 +436,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction()) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?rev=955149&r1=955148&r2=955149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java Wed Jun 16 07:40:33 2010
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.ft;
+import java.io.File;
import java.net.URISyntaxException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicTransactionTest;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.test.JmsResourceProvider;
/**
@@ -32,13 +34,19 @@ public class TransactedTopicMasterSlaveT
protected String uriString = "failover://(tcp://localhost:62001?soWriteTimeout=15000,tcp://localhost:62002?soWriteTimeout=15000)?randomize=false";
private boolean stopMaster = false;
+ @Override
protected void setUp() throws Exception {
failureCount = super.batchCount / 2;
// this will create the main (or master broker)
broker = createBroker();
+ File dir = new File ("target" + File.separator + "slave");
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dir);
+ adapter.setConcurrentStoreAndDispatchTransactions(false);
broker.start();
slave = new BrokerService();
slave.setBrokerName("slave");
+ slave.setPersistenceAdapter(adapter);
slave.setDeleteAllMessagesOnStartup(true);
slave.setMasterConnectorURI("tcp://localhost:62001");
slave.addConnector("tcp://localhost:62002");
@@ -53,26 +61,35 @@ public class TransactedTopicMasterSlaveT
reconnect();
}
+ @Override
protected void tearDown() throws Exception {
slave.stop();
slave = null;
super.tearDown();
}
+ @Override
protected BrokerService createBroker() throws Exception, URISyntaxException {
+ File dir = new File ("target" + File.separator + "master");
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dir);
+ adapter.setConcurrentStoreAndDispatchTransactions(false);
BrokerService broker = new BrokerService();
broker.setBrokerName("master");
+ broker.setPersistenceAdapter(adapter);
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:62001");
return broker;
}
+ @Override
protected JmsResourceProvider getJmsResourceProvider() {
JmsResourceProvider p = super.getJmsResourceProvider();
p.setServerUri(uriString);
return p;
}
+ @Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uriString);
}
@@ -86,6 +103,7 @@ public class TransactedTopicMasterSlaveT
}
}
+ @Override
protected void messageSent() throws Exception {
if (stopMaster) {
if (++inflightMessageCount >= failureCount) {