You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/09/13 17:01:38 UTC
svn commit: r1170201 [1/3] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/filter/
activemq-core/src/main/java/org/apache/activemq/store/kahadb/
activemq-core/src/test/java/org/apache/activemq/broker/
activemq-core/src/test/java/o...
Author: gtully
Date: Tue Sep 13 15:01:37 2011
New Revision: 1170201
URL: http://svn.apache.org/viewvc?rev=1170201&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2922 - rework, introduce new store 'mKahaDB' that contains multiple filtered kahadb persistence adapters, destinations match a store using destination wildcards in the same way as policy entries. Transactions that span multiple stores use a local xa variant to ensure consistency
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.filter;
+
+import java.lang.IllegalStateException;
+import javax.jms.*;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/*
+ * allow match to any set of composite destinations, both queues and topics
+ */
+public class AnyDestination extends ActiveMQDestination {
+
+ public AnyDestination(ActiveMQDestination[] destinations) {
+ super(destinations);
+ // ensure we are small when it comes to comparison in DestinationMap
+ physicalName = "0";
+ }
+
+ @Override
+ protected String getQualifiedPrefix() {
+ return "Any://";
+ }
+
+ @Override
+ public byte getDestinationType() {
+ return ActiveMQDestination.QUEUE_TYPE & ActiveMQDestination.TOPIC_TYPE;
+ }
+
+ @Override
+ public byte getDataStructureType() {
+ throw new IllegalStateException("not for marshalling");
+ }
+
+ @Override
+ public boolean isQueue() {
+ return true;
+ }
+
+ @Override
+ public boolean isTopic() {
+ return true;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * @org.apache.xbean.XBean element="filteredKahaDB"
+ *
+ */
+public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
+ private KahaDBPersistenceAdapter persistenceAdapter;
+
+ public KahaDBPersistenceAdapter getPersistenceAdapter() {
+ return persistenceAdapter;
+ }
+
+ public void setPersistenceAdapter(KahaDBPersistenceAdapter persistenceAdapter) {
+ this.persistenceAdapter = persistenceAdapter;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ // ok to have no destination, we default it
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Tue Sep 13 15:01:37 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb
import java.io.File;
import java.io.IOException;
import java.util.Set;
-
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -27,11 +26,18 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
/**
@@ -46,6 +52,8 @@ public class KahaDBPersistenceAdapter im
private final KahaDBStore letter = new KahaDBStore();
/**
+ * @param context
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void beginTransaction(ConnectionContext context) throws IOException {
@@ -53,6 +61,8 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param sync
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
*/
public void checkpoint(boolean sync) throws IOException {
@@ -60,6 +70,8 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param context
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void commitTransaction(ConnectionContext context) throws IOException {
@@ -67,7 +79,9 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param destination
* @return MessageStore
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
@@ -75,7 +89,9 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param destination
* @return TopicMessageStore
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
@@ -83,7 +99,8 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @return TrandactionStore
+ * @return TransactionStore
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
*/
public TransactionStore createTransactionStore() throws IOException {
@@ -91,6 +108,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
*/
public void deleteAllMessages() throws IOException {
@@ -107,6 +125,7 @@ public class KahaDBPersistenceAdapter im
/**
* @return lastMessageBrokerSequenceId
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
*/
public long getLastMessageBrokerSequenceId() throws IOException {
@@ -118,6 +137,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -125,6 +145,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -132,6 +153,8 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param context
+ * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void rollbackTransaction(ConnectionContext context) throws IOException {
@@ -139,6 +162,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param brokerName
* @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
*/
public void setBrokerName(String brokerName) {
@@ -146,6 +170,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
*/
public void setUsageManager(SystemUsage usageManager) {
@@ -161,6 +186,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception {
@@ -168,6 +194,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
@@ -176,7 +203,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxFileLength
- *
+ *
* @return the journalMaxFileLength
*/
public int getJournalMaxFileLength() {
@@ -186,6 +213,8 @@ public class KahaDBPersistenceAdapter im
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used
+ *
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.letter.setJournalMaxFileLength(journalMaxFileLength);
@@ -197,7 +226,7 @@ public class KahaDBPersistenceAdapter im
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
}
-
+
public int getMaxFailoverProducersToTrack() {
return this.letter.getMaxFailoverProducersToTrack();
}
@@ -209,14 +238,14 @@ public class KahaDBPersistenceAdapter im
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
}
-
+
public int getFailoverProducersAuditDepth() {
return this.getFailoverProducersAuditDepth();
}
-
+
/**
* Get the checkpointInterval
- *
+ *
* @return the checkpointInterval
*/
public long getCheckpointInterval() {
@@ -225,8 +254,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the checkpointInterval
- *
- * @param checkpointInterval the checkpointInterval to set
+ *
+ * @param checkpointInterval
+ * the checkpointInterval to set
*/
public void setCheckpointInterval(long checkpointInterval) {
this.letter.setCheckpointInterval(checkpointInterval);
@@ -234,7 +264,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the cleanupInterval
- *
+ *
* @return the cleanupInterval
*/
public long getCleanupInterval() {
@@ -243,8 +273,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the cleanupInterval
- *
- * @param cleanupInterval the cleanupInterval to set
+ *
+ * @param cleanupInterval
+ * the cleanupInterval to set
*/
public void setCleanupInterval(long cleanupInterval) {
this.letter.setCleanupInterval(cleanupInterval);
@@ -252,7 +283,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the indexWriteBatchSize
- *
+ *
* @return the indexWriteBatchSize
*/
public int getIndexWriteBatchSize() {
@@ -262,8 +293,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexWriteBatchSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- *
- * @param indexWriteBatchSize the indexWriteBatchSize to set
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+ * @param indexWriteBatchSize
+ * the indexWriteBatchSize to set
*/
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@@ -271,7 +303,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxWriteBatchSize
- *
+ *
* @return the journalMaxWriteBatchSize
*/
public int getJournalMaxWriteBatchSize() {
@@ -280,9 +312,10 @@ public class KahaDBPersistenceAdapter im
/**
* Set the journalMaxWriteBatchSize
- * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- *
- * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
+ * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+ * @param journalMaxWriteBatchSize
+ * the journalMaxWriteBatchSize to set
*/
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
@@ -290,7 +323,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the enableIndexWriteAsync
- *
+ *
* @return the enableIndexWriteAsync
*/
public boolean isEnableIndexWriteAsync() {
@@ -299,8 +332,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableIndexWriteAsync
- *
- * @param enableIndexWriteAsync the enableIndexWriteAsync to set
+ *
+ * @param enableIndexWriteAsync
+ * the enableIndexWriteAsync to set
*/
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@@ -308,7 +342,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the directory
- *
+ *
* @return the directory
*/
public File getDirectory() {
@@ -316,6 +350,7 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
*/
public void setDirectory(File dir) {
@@ -324,7 +359,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the enableJournalDiskSyncs
- *
+ *
* @return the enableJournalDiskSyncs
*/
public boolean isEnableJournalDiskSyncs() {
@@ -333,8 +368,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableJournalDiskSyncs
- *
- * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
+ *
+ * @param enableJournalDiskSyncs
+ * the enableJournalDiskSyncs to set
*/
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
@@ -342,7 +378,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the indexCacheSize
- *
+ *
* @return the indexCacheSize
*/
public int getIndexCacheSize() {
@@ -352,8 +388,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexCacheSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- *
- * @param indexCacheSize the indexCacheSize to set
+ * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+ * @param indexCacheSize
+ * the indexCacheSize to set
*/
public void setIndexCacheSize(int indexCacheSize) {
this.letter.setIndexCacheSize(indexCacheSize);
@@ -361,7 +398,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the ignoreMissingJournalfiles
- *
+ *
* @return the ignoreMissingJournalfiles
*/
public boolean isIgnoreMissingJournalfiles() {
@@ -370,8 +407,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the ignoreMissingJournalfiles
- *
- * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
+ *
+ * @param ignoreMissingJournalfiles
+ * the ignoreMissingJournalfiles to set
*/
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@@ -432,14 +470,14 @@ public class KahaDBPersistenceAdapter im
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
-
/**
- * @param maxAsyncJobs the maxAsyncJobs to set
+ * @param maxAsyncJobs
+ * the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
letter.setMaxAsyncJobs(maxAsyncJobs);
}
-
+
/**
* @return the databaseLockedWaitDelay
*/
@@ -451,7 +489,7 @@ public class KahaDBPersistenceAdapter im
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
- letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
+ letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
}
public boolean getForceRecoverIndex() {
@@ -462,19 +500,33 @@ public class KahaDBPersistenceAdapter im
letter.setForceRecoverIndex(forceRecoverIndex);
}
- public boolean isJournalPerDestination() {
- return letter.isJournalPerDestination();
- }
-
- public void setJournalPerDestination(boolean journalPerDestination) {
- letter.setJournalPerDestination(journalPerDestination);
- }
-
- // for testing
public KahaDBStore getStore() {
return letter;
}
+ public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+ if (txid == null) {
+ return null;
+ }
+ KahaTransactionInfo rc = new KahaTransactionInfo();
+
+ if (txid.isLocalTransaction()) {
+ LocalTransactionId t = (LocalTransactionId) txid;
+ KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
+ kahaTxId.setConnectionId(t.getConnectionId().getValue());
+ kahaTxId.setTransacitonId(t.getValue());
+ rc.setLocalTransacitonId(kahaTxId);
+ } else {
+ XATransactionId t = (XATransactionId) txid;
+ KahaXATransactionId kahaTxId = new KahaXATransactionId();
+ kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+ kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
+ kahaTxId.setFormatId(t.getFormatId());
+ rc.setXaTransacitonId(kahaTxId);
+ }
+ return rc;
+ }
+
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Sep 13 15:01:37 2011
@@ -26,22 +26,23 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
@@ -52,20 +53,20 @@ import org.apache.activemq.store.TopicMe
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.journal.Journal;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
@@ -76,7 +77,7 @@ public class KahaDBStore extends Message
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
- PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);
+ PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
@@ -95,9 +96,16 @@ public class KahaDBStore extends Message
private boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
+ private TransactionIdTransformer transactionIdTransformer;
public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this);
+ this.transactionIdTransformer = new TransactionIdTransformer() {
+ @Override
+ public KahaTransactionInfo transform(TransactionId txid) {
+ return TransactionIdConversion.convert(txid);
+ }
+ };
}
@Override
@@ -124,7 +132,8 @@ public class KahaDBStore extends Message
}
/**
- * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
+ * @param concurrentStoreAndDispatch
+ * the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
@@ -138,7 +147,8 @@ public class KahaDBStore extends Message
}
/**
- * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
+ * @param concurrentStoreAndDispatch
+ * the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
@@ -147,16 +157,16 @@ public class KahaDBStore extends Message
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
-
+
/**
* @return the maxAsyncJobs
*/
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
-
/**
- * @param maxAsyncJobs the maxAsyncJobs to set
+ * @param maxAsyncJobs
+ * the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs;
@@ -171,20 +181,20 @@ public class KahaDBStore extends Message
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
- thread.setDaemon(true);
- return thread;
- }
- });
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
- thread.setDaemon(true);
- return thread;
- }
- });
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
}
@Override
@@ -281,16 +291,14 @@ public class KahaDBStore extends Message
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
- private final Journal journal;
double doneTasks, canceledTasks = 0;
- public KahaDBMessageStore(ActiveMQDestination destination) throws IOException {
+ public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
- this.journal = getJournalManager().getJournal(destination);
}
@Override
@@ -347,30 +355,30 @@ public class KahaDBStore extends Message
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
- command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
+ command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
-
+ store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
+
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
- command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
+ command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
+ store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
- store(journal, command, true, null, null);
+ store(command, true, null, null);
}
public Message getMessage(MessageId identity) throws IOException {
@@ -392,14 +400,14 @@ public class KahaDBStore extends Message
return sd.orderIndex.get(tx, sequence).location;
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
if (location == null) {
return null;
}
- return loadMessage(journal, location);
+ return loadMessage(location);
}
public int getMessageCount() throws IOException {
@@ -415,14 +423,14 @@ public class KahaDBStore extends Message
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
- .hasNext(); ) {
+ .hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
} finally {
@@ -442,7 +450,7 @@ public class KahaDBStore extends Message
return sd.locationIndex.isEmpty(tx);
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
}
@@ -461,17 +469,17 @@ public class KahaDBStore extends Message
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
- Message msg = loadMessage(journal, entry.getValue().location);
+ Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
-
+
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
@@ -486,7 +494,7 @@ public class KahaDBStore extends Message
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
- Message msg = loadMessage(journal, entry.getValue().location);
+ Message msg = loadMessage(entry.getValue().location);
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned) {
@@ -496,23 +504,24 @@ public class KahaDBStore extends Message
sd.orderIndex.stoppedIterating();
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
}
public void resetBatching() {
- try {
- pageFile.tx().execute(new Transaction.Closure<Exception>() {
- public void execute(Transaction tx) throws Exception {
- StoredDestination sd = getExistingStoredDestination(dest, tx);
- if (sd != null) {
- sd.orderIndex.resetCursorPosition();
- }
- }
- });
- } catch (Exception e) {
- LOG.error("Failed to reset batching", e);
+ if (pageFile.isLoaded()) {
+ try {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
+ public void execute(Transaction tx) throws Exception {
+ StoredDestination sd = getExistingStoredDestination(dest, tx);
+ if (sd != null) {
+ sd.orderIndex.resetCursorPosition();}
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to reset batching",e);
+ }
}
}
@@ -525,10 +534,10 @@ public class KahaDBStore extends Message
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
-
+
indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key);
@@ -537,10 +546,10 @@ public class KahaDBStore extends Message
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
-
+
} finally {
unlockAsyncJobQueue();
}
@@ -550,21 +559,15 @@ public class KahaDBStore extends Message
@Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
-
@Override
public void start() throws Exception {
super.start();
}
-
@Override
public void stop() throws Exception {
super.stop();
}
- public Journal getJournal() {
- return this.journal;
- }
-
protected void lockAsyncJobQueue() {
try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@@ -593,7 +596,6 @@ public class KahaDBStore extends Message
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger();
-
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
@@ -646,11 +648,11 @@ public class KahaDBStore extends Message
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
- command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
+ command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
}
- store(getJournal(), command, false, null, null);
+ store(command, false, null, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -662,7 +664,7 @@ public class KahaDBStore extends Message
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
+ store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
@@ -670,7 +672,7 @@ public class KahaDBStore extends Message
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
- store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
+ store(command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@@ -683,7 +685,7 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
- .hasNext(); ) {
+ .hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
.getValue().getSubscriptionInfo().newInput()));
@@ -692,7 +694,7 @@ public class KahaDBStore extends Message
}
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
@@ -716,7 +718,7 @@ public class KahaDBStore extends Message
.getSubscriptionInfo().newInput()));
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
}
@@ -736,7 +738,7 @@ public class KahaDBStore extends Message
int counter = 0;
for (Iterator<Entry<Long, HashSet<String>>> iterator =
- sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) {
+ sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().contains(subscriptionKey)) {
counter++;
@@ -745,7 +747,7 @@ public class KahaDBStore extends Message
return counter;
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -762,20 +764,20 @@ public class KahaDBStore extends Message
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
- .hasNext(); ) {
+ .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location));
+ listener.recoverMessage(loadMessage(entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
- final MessageRecoveryListener listener) throws Exception {
+ final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
@@ -800,9 +802,9 @@ public class KahaDBStore extends Message
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
- .hasNext(); ) {
+ .hasNext();) {
entry = iterator.next();
- if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) {
+ if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
counter++;
}
if (counter >= maxReturned || listener.hasSpace() == false) {
@@ -816,7 +818,7 @@ public class KahaDBStore extends Message
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -832,7 +834,7 @@ public class KahaDBStore extends Message
sd.subscriptionCursors.remove(subscriptionKey);
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
} catch (IOException e) {
@@ -856,8 +858,9 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
@@ -865,8 +868,9 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
@@ -883,7 +887,7 @@ public class KahaDBStore extends Message
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
- .hasNext(); ) {
+ .hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey()));
@@ -904,7 +908,7 @@ public class KahaDBStore extends Message
return isEmptyTopic;
}
});
- } finally {
+ }finally {
indexLock.readLock().unlock();
}
return rc;
@@ -916,7 +920,7 @@ public class KahaDBStore extends Message
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
+
public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock();
try {
@@ -933,11 +937,9 @@ public class KahaDBStore extends Message
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
-
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
-
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
@@ -955,8 +957,8 @@ public class KahaDBStore extends Message
* @return
* @throws IOException
*/
- Message loadMessage(Journal journal, Location location) throws IOException {
- KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location);
+ Message loadMessage(Location location) throws IOException {
+ KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg;
}
@@ -976,20 +978,20 @@ public class KahaDBStore extends Message
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
switch (dest.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- rc.setType(DestinationType.QUEUE);
- return rc;
- case ActiveMQDestination.TOPIC_TYPE:
- rc.setType(DestinationType.TOPIC);
- return rc;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- rc.setType(DestinationType.TEMP_QUEUE);
- return rc;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- rc.setType(DestinationType.TEMP_TOPIC);
- return rc;
- default:
- return null;
+ case ActiveMQDestination.QUEUE_TYPE:
+ rc.setType(DestinationType.QUEUE);
+ return rc;
+ case ActiveMQDestination.TOPIC_TYPE:
+ rc.setType(DestinationType.TOPIC);
+ return rc;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ rc.setType(DestinationType.TEMP_QUEUE);
+ return rc;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ rc.setType(DestinationType.TEMP_TOPIC);
+ return rc;
+ default:
+ return null;
}
}
@@ -1002,19 +1004,27 @@ public class KahaDBStore extends Message
String name = dest.substring(p + 1);
switch (KahaDestination.DestinationType.valueOf(type)) {
- case QUEUE:
- return new ActiveMQQueue(name);
- case TOPIC:
- return new ActiveMQTopic(name);
- case TEMP_QUEUE:
- return new ActiveMQTempQueue(name);
- case TEMP_TOPIC:
- return new ActiveMQTempTopic(name);
- default:
- throw new IllegalArgumentException("Not in the valid destination format");
+ case QUEUE:
+ return new ActiveMQQueue(name);
+ case TOPIC:
+ return new ActiveMQTopic(name);
+ case TEMP_QUEUE:
+ return new ActiveMQTempQueue(name);
+ case TEMP_TOPIC:
+ return new ActiveMQTempTopic(name);
+ default:
+ throw new IllegalArgumentException("Not in the valid destination format");
}
}
+ public TransactionIdTransformer getTransactionIdTransformer() {
+ return transactionIdTransformer;
+ }
+
+ public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
+ this.transactionIdTransformer = transactionIdTransformer;
+ }
+
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
@@ -1141,9 +1151,8 @@ public class KahaDBStore extends Message
private final int subscriptionCount;
private final List<String> subscriptionKeys = new ArrayList<String>(1);
private final KahaDBTopicMessageStore topicStore;
-
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
- int subscriptionCount) {
+ int subscriptionCount) {
super(store, context, message);
this.topicStore = store;
this.subscriptionCount = subscriptionCount;
@@ -1175,7 +1184,8 @@ public class KahaDBStore extends Message
/**
* add a key
- *
+ *
+ * @param key
* @return true if all acknowledgements received
*/
public boolean addSubscriptionKey(String key) {
@@ -1221,7 +1231,7 @@ public class KahaDBStore extends Message
super.afterExecute(runnable, throwable);
if (runnable instanceof StoreTask) {
- ((StoreTask) runnable).releaseLocks();
+ ((StoreTask)runnable).releaseLocks();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Sep 13 15:01:37 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -27,9 +26,7 @@ import java.util.concurrent.Cancellation
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@@ -52,13 +49,14 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.journal.Journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
+ *
+ *
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@@ -72,23 +70,21 @@ public class KahaDBTransactionStore impl
public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
- private final HashSet<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>();
public void add(AddMessageCommand msg) {
messages.add(msg);
- destinations.add(msg.getMessage().getDestination());
}
public void add(RemoveMessageCommand ack) {
acks.add(ack);
- destinations.add(ack.getMessageAck().getDestination());
}
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count = 0;
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessage();
}
@@ -98,7 +94,7 @@ public class KahaDBTransactionStore impl
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count = 0;
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessageAck();
}
@@ -107,56 +103,49 @@ public class KahaDBTransactionStore impl
/**
* @return true if something to commit
+ * @throws IOException
*/
public List<Future<Object>> commit() throws IOException {
List<Future<Object>> results = new ArrayList<Future<Object>>();
// Do all the message adds.
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
results.add(cmd.run());
}
// And removes..
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
cmd.run();
results.add(cmd.run());
}
-
+
return results;
}
}
public abstract class AddMessageCommand {
private final ConnectionContext ctx;
-
AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
-
abstract Message getMessage();
-
Future<Object> run() throws IOException {
return run(this.ctx);
}
-
abstract Future<Object> run(ConnectionContext ctx) throws IOException;
}
public abstract class RemoveMessageCommand {
private final ConnectionContext ctx;
-
RemoveMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
-
abstract MessageAck getMessageAck();
-
Future<Object> run() throws IOException {
return run(this.ctx);
}
-
abstract Future<Object> run(ConnectionContext context) throws IOException;
}
@@ -208,8 +197,8 @@ public class KahaDBTransactionStore impl
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack) throws IOException {
- KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId,
+ MessageId messageId, MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
subscriptionName, messageId, ack);
}
@@ -217,20 +206,17 @@ public class KahaDBTransactionStore impl
}
/**
+ * @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException {
KahaTransactionInfo info = getTransactionInfo(txid);
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
- for (Journal journal : theStore.getJournalManager().getJournals()) {
- theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
- }
+ theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
} else {
Tx tx = inflightTransactions.remove(txid);
if (tx != null) {
- for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
- theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
- }
+ theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
}
}
@@ -262,7 +248,7 @@ public class KahaDBTransactionStore impl
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
} catch (ExecutionException e) {
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
- } catch (CancellationException e) {
+ }catch(CancellationException e) {
}
if (!result.isCancelled()) {
doneSomething = true;
@@ -273,11 +259,9 @@ public class KahaDBTransactionStore impl
}
if (doneSomething) {
KahaTransactionInfo info = getTransactionInfo(txid);
- for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
- theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null);
- }
+ theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
}
- } else {
+ }else {
//The Tx will be null for failed over clients - lets run their post commits
if (postCommit != null) {
postCommit.run();
@@ -286,25 +270,22 @@ public class KahaDBTransactionStore impl
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
- for (Journal journal : theStore.getJournalManager().getJournals()) {
- theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
- }
- forgetRecoveredAcks(txid);
+ theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ forgetRecoveredAcks(txid);
}
- } else {
- LOG.error("Null transaction passed on commit");
+ }else {
+ LOG.error("Null transaction passed on commit");
}
}
/**
+ * @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid);
- for (Journal journal : theStore.getJournalManager().getJournals()) {
- theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
- }
+ theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
forgetRecoveredAcks(txid);
} else {
inflightTransactions.remove(txid);
@@ -371,7 +352,6 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
-
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
@@ -399,7 +379,6 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
-
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddQueueMessage(ctx, message);
@@ -417,7 +396,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+ if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
@@ -427,7 +406,6 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
-
@Override
public Future run(ConnectionContext ctx) throws IOException {
return destination.asyncAddTopicMessage(ctx, message);
@@ -449,7 +427,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -475,7 +453,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -501,7 +479,7 @@ public class KahaDBTransactionStore impl
final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -523,7 +501,6 @@ public class KahaDBTransactionStore impl
private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
- return theStore.createTransactionInfo(txid);
+ return theStore.getTransactionIdTransformer().transform(txid);
}
-
}