You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/09/13 17:01:38 UTC

svn commit: r1170201 [1/3] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/filter/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/broker/ activemq-core/src/test/java/o...

Author: gtully
Date: Tue Sep 13 15:01:37 2011
New Revision: 1170201

URL: http://svn.apache.org/viewvc?rev=1170201&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2922 - rework, introduce new store 'mKahaDB' that contains multiple filtered kahadb persistence adapters, destinations match a store using destination wildcards in the same way as policy entries. Transactions that span multiple stores use a local xa variant to ensure consistency

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.filter;
+
+import java.lang.IllegalStateException;
+import javax.jms.*;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/*
+  * allow match to any set  of composite destinations, both queues and topics
+ */
+public class AnyDestination extends ActiveMQDestination {
+
+    public AnyDestination(ActiveMQDestination[] destinations) {
+        super(destinations);
+        // ensure we are small when it comes to comparison in DestinationMap
+        physicalName = "0";
+    }
+
+    @Override
+    protected String getQualifiedPrefix() {
+        return "Any://";
+    }
+
+    @Override
+    public byte getDestinationType() {
+        return ActiveMQDestination.QUEUE_TYPE & ActiveMQDestination.TOPIC_TYPE;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        throw new IllegalStateException("not for marshalling");
+    }
+
+    @Override
+    public boolean isQueue() {
+        return true;
+    }
+
+    @Override
+    public boolean isTopic() {
+        return true;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/AnyDestination.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * @org.apache.xbean.XBean element="filteredKahaDB"
+ *
+ */
+public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
+    private KahaDBPersistenceAdapter persistenceAdapter;
+
+    public KahaDBPersistenceAdapter getPersistenceAdapter() {
+        return persistenceAdapter;
+    }
+
+    public void setPersistenceAdapter(KahaDBPersistenceAdapter persistenceAdapter) {
+        this.persistenceAdapter = persistenceAdapter;
+    }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        // ok to have no destination, we default it
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Tue Sep 13 15:01:37 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb
 import java.io.File;
 import java.io.IOException;
 import java.util.Set;
-
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -27,11 +26,18 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -46,6 +52,8 @@ public class KahaDBPersistenceAdapter im
     private final KahaDBStore letter = new KahaDBStore();
 
     /**
+     * @param context
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
      */
     public void beginTransaction(ConnectionContext context) throws IOException {
@@ -53,6 +61,8 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param sync
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
      */
     public void checkpoint(boolean sync) throws IOException {
@@ -60,6 +70,8 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param context
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
      */
     public void commitTransaction(ConnectionContext context) throws IOException {
@@ -67,7 +79,9 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param destination
      * @return MessageStore
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
      */
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
@@ -75,7 +89,9 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param destination
      * @return TopicMessageStore
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
      */
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
@@ -83,7 +99,8 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
-     * @return TrandactionStore
+     * @return TransactionStore
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
      */
     public TransactionStore createTransactionStore() throws IOException {
@@ -91,6 +108,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
      */
     public void deleteAllMessages() throws IOException {
@@ -107,6 +125,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * @return lastMessageBrokerSequenceId
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
      */
     public long getLastMessageBrokerSequenceId() throws IOException {
@@ -118,6 +137,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param destination
      * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
      */
     public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -125,6 +145,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param destination
      * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
      */
     public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -132,6 +153,8 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param context
+     * @throws IOException
      * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
      */
     public void rollbackTransaction(ConnectionContext context) throws IOException {
@@ -139,6 +162,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param brokerName
      * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
      */
     public void setBrokerName(String brokerName) {
@@ -146,6 +170,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param usageManager
      * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
      */
     public void setUsageManager(SystemUsage usageManager) {
@@ -161,6 +186,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @throws Exception
      * @see org.apache.activemq.Service#start()
      */
     public void start() throws Exception {
@@ -168,6 +194,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @throws Exception
      * @see org.apache.activemq.Service#stop()
      */
     public void stop() throws Exception {
@@ -176,7 +203,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the journalMaxFileLength
-     *
+     * 
      * @return the journalMaxFileLength
      */
     public int getJournalMaxFileLength() {
@@ -186,6 +213,8 @@ public class KahaDBPersistenceAdapter im
     /**
      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
      * be used
+     * 
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setJournalMaxFileLength(int journalMaxFileLength) {
         this.letter.setJournalMaxFileLength(journalMaxFileLength);
@@ -197,7 +226,7 @@ public class KahaDBPersistenceAdapter im
     public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
         this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
     }
-
+    
     public int getMaxFailoverProducersToTrack() {
         return this.letter.getMaxFailoverProducersToTrack();
     }
@@ -209,14 +238,14 @@ public class KahaDBPersistenceAdapter im
     public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
         this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
     }
-
+    
     public int getFailoverProducersAuditDepth() {
         return this.getFailoverProducersAuditDepth();
     }
-
+    
     /**
      * Get the checkpointInterval
-     *
+     * 
      * @return the checkpointInterval
      */
     public long getCheckpointInterval() {
@@ -225,8 +254,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the checkpointInterval
-     *
-     * @param checkpointInterval the checkpointInterval to set
+     * 
+     * @param checkpointInterval
+     *            the checkpointInterval to set
      */
     public void setCheckpointInterval(long checkpointInterval) {
         this.letter.setCheckpointInterval(checkpointInterval);
@@ -234,7 +264,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the cleanupInterval
-     *
+     * 
      * @return the cleanupInterval
      */
     public long getCleanupInterval() {
@@ -243,8 +273,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the cleanupInterval
-     *
-     * @param cleanupInterval the cleanupInterval to set
+     * 
+     * @param cleanupInterval
+     *            the cleanupInterval to set
      */
     public void setCleanupInterval(long cleanupInterval) {
         this.letter.setCleanupInterval(cleanupInterval);
@@ -252,7 +283,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the indexWriteBatchSize
-     *
+     * 
      * @return the indexWriteBatchSize
      */
     public int getIndexWriteBatchSize() {
@@ -262,8 +293,9 @@ public class KahaDBPersistenceAdapter im
     /**
      * Set the indexWriteBatchSize
      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-     *
-     * @param indexWriteBatchSize the indexWriteBatchSize to set
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     * @param indexWriteBatchSize
+     *            the indexWriteBatchSize to set
      */
     public void setIndexWriteBatchSize(int indexWriteBatchSize) {
         this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@@ -271,7 +303,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the journalMaxWriteBatchSize
-     *
+     * 
      * @return the journalMaxWriteBatchSize
      */
     public int getJournalMaxWriteBatchSize() {
@@ -280,9 +312,10 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the journalMaxWriteBatchSize
-     * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-     *
-     * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
+     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     * @param journalMaxWriteBatchSize
+     *            the journalMaxWriteBatchSize to set
      */
     public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
         this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
@@ -290,7 +323,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the enableIndexWriteAsync
-     *
+     * 
      * @return the enableIndexWriteAsync
      */
     public boolean isEnableIndexWriteAsync() {
@@ -299,8 +332,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the enableIndexWriteAsync
-     *
-     * @param enableIndexWriteAsync the enableIndexWriteAsync to set
+     * 
+     * @param enableIndexWriteAsync
+     *            the enableIndexWriteAsync to set
      */
     public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
         this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@@ -308,7 +342,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the directory
-     *
+     * 
      * @return the directory
      */
     public File getDirectory() {
@@ -316,6 +350,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
+     * @param dir
      * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
      */
     public void setDirectory(File dir) {
@@ -324,7 +359,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the enableJournalDiskSyncs
-     *
+     * 
      * @return the enableJournalDiskSyncs
      */
     public boolean isEnableJournalDiskSyncs() {
@@ -333,8 +368,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the enableJournalDiskSyncs
-     *
-     * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
+     * 
+     * @param enableJournalDiskSyncs
+     *            the enableJournalDiskSyncs to set
      */
     public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
         this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
@@ -342,7 +378,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the indexCacheSize
-     *
+     * 
      * @return the indexCacheSize
      */
     public int getIndexCacheSize() {
@@ -352,8 +388,9 @@ public class KahaDBPersistenceAdapter im
     /**
      * Set the indexCacheSize
      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
-     *
-     * @param indexCacheSize the indexCacheSize to set
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     * @param indexCacheSize
+     *            the indexCacheSize to set
      */
     public void setIndexCacheSize(int indexCacheSize) {
         this.letter.setIndexCacheSize(indexCacheSize);
@@ -361,7 +398,7 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Get the ignoreMissingJournalfiles
-     *
+     * 
      * @return the ignoreMissingJournalfiles
      */
     public boolean isIgnoreMissingJournalfiles() {
@@ -370,8 +407,9 @@ public class KahaDBPersistenceAdapter im
 
     /**
      * Set the ignoreMissingJournalfiles
-     *
-     * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
+     * 
+     * @param ignoreMissingJournalfiles
+     *            the ignoreMissingJournalfiles to set
      */
     public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
         this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@@ -432,14 +470,14 @@ public class KahaDBPersistenceAdapter im
     public int getMaxAsyncJobs() {
         return letter.getMaxAsyncJobs();
     }
-
     /**
-     * @param maxAsyncJobs the maxAsyncJobs to set
+     * @param maxAsyncJobs
+     *            the maxAsyncJobs to set
      */
     public void setMaxAsyncJobs(int maxAsyncJobs) {
         letter.setMaxAsyncJobs(maxAsyncJobs);
     }
-
+    
     /**
      * @return the databaseLockedWaitDelay
      */
@@ -451,7 +489,7 @@ public class KahaDBPersistenceAdapter im
      * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
      */
     public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
-        letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
+       letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
     }
 
     public boolean getForceRecoverIndex() {
@@ -462,19 +500,33 @@ public class KahaDBPersistenceAdapter im
         letter.setForceRecoverIndex(forceRecoverIndex);
     }
 
-    public boolean isJournalPerDestination() {
-        return letter.isJournalPerDestination();
-    }
-
-    public void setJournalPerDestination(boolean journalPerDestination) {
-        letter.setJournalPerDestination(journalPerDestination);
-    }
-
-    //  for testing
     public KahaDBStore getStore() {
         return letter;
     }
 
+    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
+        if (txid == null) {
+            return null;
+        }
+        KahaTransactionInfo rc = new KahaTransactionInfo();
+
+        if (txid.isLocalTransaction()) {
+            LocalTransactionId t = (LocalTransactionId) txid;
+            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
+            kahaTxId.setConnectionId(t.getConnectionId().getValue());
+            kahaTxId.setTransacitonId(t.getValue());
+            rc.setLocalTransacitonId(kahaTxId);
+        } else {
+            XATransactionId t = (XATransactionId) txid;
+            KahaXATransactionId kahaTxId = new KahaXATransactionId();
+            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
+            kahaTxId.setFormatId(t.getFormatId());
+            rc.setXaTransacitonId(kahaTxId);
+        }
+        return rc;
+    }
+
     @Override
     public String toString() {
         String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Sep 13 15:01:37 2011
@@ -26,22 +26,23 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.AbstractMessageStore;
@@ -52,20 +53,20 @@ import org.apache.activemq.store.TopicMe
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.journal.Journal;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Transaction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
 
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
@@ -76,7 +77,7 @@ public class KahaDBStore extends Message
             PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
     public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
     private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
-            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);
+            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
 
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
@@ -95,9 +96,16 @@ public class KahaDBStore extends Message
     private boolean concurrentStoreAndDispatchTransactions = false;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
     private final KahaDBTransactionStore transactionStore;
+    private TransactionIdTransformer transactionIdTransformer;
 
     public KahaDBStore() {
         this.transactionStore = new KahaDBTransactionStore(this);
+        this.transactionIdTransformer = new TransactionIdTransformer() {
+            @Override
+            public KahaTransactionInfo transform(TransactionId txid) {
+                return TransactionIdConversion.convert(txid);
+            }
+        };
     }
 
     @Override
@@ -124,7 +132,8 @@ public class KahaDBStore extends Message
     }
 
     /**
-     * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
+     * @param concurrentStoreAndDispatch
+     *            the concurrentStoreAndDispatch to set
      */
     public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
         this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
@@ -138,7 +147,8 @@ public class KahaDBStore extends Message
     }
 
     /**
-     * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
+     * @param concurrentStoreAndDispatch
+     *            the concurrentStoreAndDispatch to set
      */
     public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
         this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
@@ -147,16 +157,16 @@ public class KahaDBStore extends Message
     public boolean isConcurrentStoreAndDispatchTransactions() {
         return this.concurrentStoreAndDispatchTransactions;
     }
-
+    
     /**
      * @return the maxAsyncJobs
      */
     public int getMaxAsyncJobs() {
         return this.maxAsyncJobs;
     }
-
     /**
-     * @param maxAsyncJobs the maxAsyncJobs to set
+     * @param maxAsyncJobs
+     *            the maxAsyncJobs to set
      */
     public void setMaxAsyncJobs(int maxAsyncJobs) {
         this.maxAsyncJobs = maxAsyncJobs;
@@ -171,20 +181,20 @@ public class KahaDBStore extends Message
         this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
                 asyncQueueJobQueue, new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+                    public Thread newThread(Runnable runnable) {
+                        Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
+                        thread.setDaemon(true);
+                        return thread;
+                    }
+                });
         this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
                 asyncTopicJobQueue, new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
+                    public Thread newThread(Runnable runnable) {
+                        Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
+                        thread.setDaemon(true);
+                        return thread;
+                    }
+                });
     }
 
     @Override
@@ -281,16 +291,14 @@ public class KahaDBStore extends Message
         protected KahaDestination dest;
         private final int maxAsyncJobs;
         private final Semaphore localDestinationSemaphore;
-        private final Journal journal;
 
         double doneTasks, canceledTasks = 0;
 
-        public KahaDBMessageStore(ActiveMQDestination destination) throws IOException {
+        public KahaDBMessageStore(ActiveMQDestination destination) {
             super(destination);
             this.dest = convert(destination);
             this.maxAsyncJobs = getMaxAsyncJobs();
             this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
-            this.journal = getJournalManager().getJournal(destination);
         }
 
         @Override
@@ -347,30 +355,30 @@ public class KahaDBStore extends Message
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toString());
-            command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
+            command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
             command.setPriority(message.getPriority());
             command.setPrioritySupported(isPrioritizedMessages());
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
-
+            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
+            
         }
 
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setMessageId(ack.getLastMessageId().toString());
-            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
+            command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
             command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
+            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
         }
 
         public void removeAllMessages(ConnectionContext context) throws IOException {
             KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
             command.setDestination(dest);
-            store(journal, command, true, null, null);
+            store(command, true, null, null);
         }
 
         public Message getMessage(MessageId identity) throws IOException {
@@ -392,14 +400,14 @@ public class KahaDBStore extends Message
                         return sd.orderIndex.get(tx, sequence).location;
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
             if (location == null) {
                 return null;
             }
 
-            return loadMessage(journal, location);
+            return loadMessage(location);
         }
 
         public int getMessageCount() throws IOException {
@@ -415,14 +423,14 @@ public class KahaDBStore extends Message
                             StoredDestination sd = getStoredDestination(dest, tx);
                             int rc = 0;
                             for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
-                                    .hasNext(); ) {
+                                    .hasNext();) {
                                 iterator.next();
                                 rc++;
                             }
                             return rc;
                         }
                     });
-                } finally {
+                }finally {
                     indexLock.readLock().unlock();
                 }
             } finally {
@@ -442,7 +450,7 @@ public class KahaDBStore extends Message
                         return sd.locationIndex.isEmpty(tx);
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
         }
@@ -461,17 +469,17 @@ public class KahaDBStore extends Message
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
-                            Message msg = loadMessage(journal, entry.getValue().location);
+                            Message msg = loadMessage(entry.getValue().location);
                             listener.recoverMessage(msg);
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
 
-
+        
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
             indexLock.readLock().lock();
             try {
@@ -486,7 +494,7 @@ public class KahaDBStore extends Message
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
-                            Message msg = loadMessage(journal, entry.getValue().location);
+                            Message msg = loadMessage(entry.getValue().location);
                             listener.recoverMessage(msg);
                             counter++;
                             if (counter >= maxReturned) {
@@ -496,23 +504,24 @@ public class KahaDBStore extends Message
                         sd.orderIndex.stoppedIterating();
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
         }
 
         public void resetBatching() {
-            try {
-                pageFile.tx().execute(new Transaction.Closure<Exception>() {
-                    public void execute(Transaction tx) throws Exception {
-                        StoredDestination sd = getExistingStoredDestination(dest, tx);
-                        if (sd != null) {
-                            sd.orderIndex.resetCursorPosition();
-                        }
-                    }
-                });
-            } catch (Exception e) {
-                LOG.error("Failed to reset batching", e);
+            if (pageFile.isLoaded()) {
+                try {
+                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
+                        public void execute(Transaction tx) throws Exception {
+                            StoredDestination sd = getExistingStoredDestination(dest, tx);
+                            if (sd != null) {
+                                sd.orderIndex.resetCursorPosition();}
+                            }
+                        });
+                } catch (Exception e) {
+                    LOG.error("Failed to reset batching",e);
+                }
             }
         }
 
@@ -525,10 +534,10 @@ public class KahaDBStore extends Message
                 // Hopefully one day the page file supports concurrent read
                 // operations... but for now we must
                 // externally synchronize...
-
+               
                 indexLock.writeLock().lock();
                 try {
-                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             Long location = sd.messageIdIndex.get(tx, key);
@@ -537,10 +546,10 @@ public class KahaDBStore extends Message
                             }
                         }
                     });
-                } finally {
+                }finally {
                     indexLock.writeLock().unlock();
                 }
-
+                
             } finally {
                 unlockAsyncJobQueue();
             }
@@ -550,21 +559,15 @@ public class KahaDBStore extends Message
         @Override
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
         }
-
         @Override
         public void start() throws Exception {
             super.start();
         }
-
         @Override
         public void stop() throws Exception {
             super.stop();
         }
 
-        public Journal getJournal() {
-            return this.journal;
-        }
-
         protected void lockAsyncJobQueue() {
             try {
                 this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@@ -593,7 +596,6 @@ public class KahaDBStore extends Message
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
         private final AtomicInteger subscriptionCount = new AtomicInteger();
-
         public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
             super(destination);
             this.subscriptionCount.set(getAllSubscriptions().length);
@@ -646,11 +648,11 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
-            command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
+            command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
             if (ack != null && ack.isUnmatchedAck()) {
                 command.setAck(UNMATCHED);
             }
-            store(getJournal(), command, false, null, null);
+            store(command, false, null, null);
         }
 
         public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -662,7 +664,7 @@ public class KahaDBStore extends Message
             command.setRetroactive(retroactive);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
+            store(command, isEnableJournalDiskSyncs() && true, null, null);
             this.subscriptionCount.incrementAndGet();
         }
 
@@ -670,7 +672,7 @@ public class KahaDBStore extends Message
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
-            store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
+            store(command, isEnableJournalDiskSyncs() && true, null, null);
             this.subscriptionCount.decrementAndGet();
         }
 
@@ -683,7 +685,7 @@ public class KahaDBStore extends Message
                     public void execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
-                                .hasNext(); ) {
+                                .hasNext();) {
                             Entry<String, KahaSubscriptionCommand> entry = iterator.next();
                             SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
                                     .getValue().getSubscriptionInfo().newInput()));
@@ -692,7 +694,7 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
 
@@ -716,7 +718,7 @@ public class KahaDBStore extends Message
                                 .getSubscriptionInfo().newInput()));
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
         }
@@ -736,7 +738,7 @@ public class KahaDBStore extends Message
 
                         int counter = 0;
                         for (Iterator<Entry<Long, HashSet<String>>> iterator =
-                                     sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) {
+                                sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
                             Entry<Long, HashSet<String>> entry = iterator.next();
                             if (entry.getValue().contains(subscriptionKey)) {
                                 counter++;
@@ -745,7 +747,7 @@ public class KahaDBStore extends Message
                         return counter;
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -762,20 +764,20 @@ public class KahaDBStore extends Message
                         LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
                         sd.orderIndex.setBatch(tx, cursorPos);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
-                                .hasNext(); ) {
+                                .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location));
+                            listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                         sd.orderIndex.resetCursorPosition();
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
 
         public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
-                                        final MessageRecoveryListener listener) throws Exception {
+                final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
             indexLock.writeLock().lock();
@@ -800,9 +802,9 @@ public class KahaDBStore extends Message
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
-                                .hasNext(); ) {
+                                .hasNext();) {
                             entry = iterator.next();
-                            if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) {
+                            if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
                                 counter++;
                             }
                             if (counter >= maxReturned || listener.hasSpace() == false) {
@@ -816,7 +818,7 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.writeLock().unlock();
             }
         }
@@ -832,7 +834,7 @@ public class KahaDBStore extends Message
                             sd.subscriptionCursors.remove(subscriptionKey);
                         }
                     });
-                } finally {
+                }finally {
                     indexLock.writeLock().unlock();
                 }
             } catch (IOException e) {
@@ -856,8 +858,9 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination.
      * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
+     * 
+     * @param destination
+     *            Destination to forget
      */
     public void removeQueueMessageStore(ActiveMQQueue destination) {
     }
@@ -865,8 +868,9 @@ public class KahaDBStore extends Message
     /**
      * Cleanup method to remove any state associated with the given destination
      * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
+     * 
+     * @param destination
+     *            Destination to forget
      */
     public void removeTopicMessageStore(ActiveMQTopic destination) {
     }
@@ -883,7 +887,7 @@ public class KahaDBStore extends Message
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
-                                .hasNext(); ) {
+                                .hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
                             if (!isEmptyTopic(entry, tx)) {
                                 rc.add(convert(entry.getKey()));
@@ -904,7 +908,7 @@ public class KahaDBStore extends Message
                         return isEmptyTopic;
                     }
                 });
-            } finally {
+            }finally {
                 indexLock.readLock().unlock();
             }
             return rc;
@@ -916,7 +920,7 @@ public class KahaDBStore extends Message
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
-
+    
     public long getLastProducerSequenceId(ProducerId id) {
         indexLock.readLock().lock();
         try {
@@ -933,11 +937,9 @@ public class KahaDBStore extends Message
     public void beginTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-
     public void commitTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
-
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         throw new IOException("Not yet implemented.");
     }
@@ -955,8 +957,8 @@ public class KahaDBStore extends Message
      * @return
      * @throws IOException
      */
-    Message loadMessage(Journal journal, Location location) throws IOException {
-        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location);
+    Message loadMessage(Location location) throws IOException {
+        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
         Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
         return msg;
     }
@@ -976,20 +978,20 @@ public class KahaDBStore extends Message
         KahaDestination rc = new KahaDestination();
         rc.setName(dest.getPhysicalName());
         switch (dest.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                rc.setType(DestinationType.QUEUE);
-                return rc;
-            case ActiveMQDestination.TOPIC_TYPE:
-                rc.setType(DestinationType.TOPIC);
-                return rc;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                rc.setType(DestinationType.TEMP_QUEUE);
-                return rc;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                rc.setType(DestinationType.TEMP_TOPIC);
-                return rc;
-            default:
-                return null;
+        case ActiveMQDestination.QUEUE_TYPE:
+            rc.setType(DestinationType.QUEUE);
+            return rc;
+        case ActiveMQDestination.TOPIC_TYPE:
+            rc.setType(DestinationType.TOPIC);
+            return rc;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            rc.setType(DestinationType.TEMP_QUEUE);
+            return rc;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            rc.setType(DestinationType.TEMP_TOPIC);
+            return rc;
+        default:
+            return null;
         }
     }
 
@@ -1002,19 +1004,27 @@ public class KahaDBStore extends Message
         String name = dest.substring(p + 1);
 
         switch (KahaDestination.DestinationType.valueOf(type)) {
-            case QUEUE:
-                return new ActiveMQQueue(name);
-            case TOPIC:
-                return new ActiveMQTopic(name);
-            case TEMP_QUEUE:
-                return new ActiveMQTempQueue(name);
-            case TEMP_TOPIC:
-                return new ActiveMQTempTopic(name);
-            default:
-                throw new IllegalArgumentException("Not in the valid destination format");
+        case QUEUE:
+            return new ActiveMQQueue(name);
+        case TOPIC:
+            return new ActiveMQTopic(name);
+        case TEMP_QUEUE:
+            return new ActiveMQTempQueue(name);
+        case TEMP_TOPIC:
+            return new ActiveMQTempTopic(name);
+        default:
+            throw new IllegalArgumentException("Not in the valid destination format");
         }
     }
 
+    public TransactionIdTransformer getTransactionIdTransformer() {
+        return transactionIdTransformer;
+    }
+
+    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
+        this.transactionIdTransformer = transactionIdTransformer;
+    }
+
     static class AsyncJobKey {
         MessageId id;
         ActiveMQDestination destination;
@@ -1141,9 +1151,8 @@ public class KahaDBStore extends Message
         private final int subscriptionCount;
         private final List<String> subscriptionKeys = new ArrayList<String>(1);
         private final KahaDBTopicMessageStore topicStore;
-
         public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
-                              int subscriptionCount) {
+                int subscriptionCount) {
             super(store, context, message);
             this.topicStore = store;
             this.subscriptionCount = subscriptionCount;
@@ -1175,7 +1184,8 @@ public class KahaDBStore extends Message
 
         /**
          * add a key
-         *
+         * 
+         * @param key
          * @return true if all acknowledgements received
          */
         public boolean addSubscriptionKey(String key) {
@@ -1221,7 +1231,7 @@ public class KahaDBStore extends Message
             super.afterExecute(runnable, throwable);
 
             if (runnable instanceof StoreTask) {
-                ((StoreTask) runnable).releaseLocks();
+               ((StoreTask)runnable).releaseLocks();
             }
 
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Sep 13 15:01:37 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -27,9 +26,7 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
@@ -52,13 +49,14 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.journal.Journal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
  * MessageStore objects from non transaction aware MessageStore objects.
+ * 
+ * 
  */
 public class KahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@@ -72,23 +70,21 @@ public class KahaDBTransactionStore impl
 
     public class Tx {
         private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+
         private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
-        private final HashSet<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>();
 
         public void add(AddMessageCommand msg) {
             messages.add(msg);
-            destinations.add(msg.getMessage().getDestination());
         }
 
         public void add(RemoveMessageCommand ack) {
             acks.add(ack);
-            destinations.add(ack.getMessageAck().getDestination());
         }
 
         public Message[] getMessages() {
             Message rc[] = new Message[messages.size()];
             int count = 0;
-            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
                 AddMessageCommand cmd = iter.next();
                 rc[count++] = cmd.getMessage();
             }
@@ -98,7 +94,7 @@ public class KahaDBTransactionStore impl
         public MessageAck[] getAcks() {
             MessageAck rc[] = new MessageAck[acks.size()];
             int count = 0;
-            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
                 RemoveMessageCommand cmd = iter.next();
                 rc[count++] = cmd.getMessageAck();
             }
@@ -107,56 +103,49 @@ public class KahaDBTransactionStore impl
 
         /**
          * @return true if something to commit
+         * @throws IOException
          */
         public List<Future<Object>> commit() throws IOException {
             List<Future<Object>> results = new ArrayList<Future<Object>>();
             // Do all the message adds.
-            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
                 AddMessageCommand cmd = iter.next();
                 results.add(cmd.run());
 
             }
             // And removes..
-            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
                 RemoveMessageCommand cmd = iter.next();
                 cmd.run();
                 results.add(cmd.run());
             }
-
+            
             return results;
         }
     }
 
     public abstract class AddMessageCommand {
         private final ConnectionContext ctx;
-
         AddMessageCommand(ConnectionContext ctx) {
             this.ctx = ctx;
         }
-
         abstract Message getMessage();
-
         Future<Object> run() throws IOException {
             return run(this.ctx);
         }
-
         abstract Future<Object> run(ConnectionContext ctx) throws IOException;
     }
 
     public abstract class RemoveMessageCommand {
 
         private final ConnectionContext ctx;
-
         RemoveMessageCommand(ConnectionContext ctx) {
             this.ctx = ctx;
         }
-
         abstract MessageAck getMessageAck();
-
         Future<Object> run() throws IOException {
             return run(this.ctx);
         }
-
         abstract Future<Object> run(ConnectionContext context) throws IOException;
     }
 
@@ -208,8 +197,8 @@ public class KahaDBTransactionStore impl
 
             @Override
             public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
-                                    MessageId messageId, MessageAck ack) throws IOException {
-                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId,
+                            MessageId messageId, MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
                         subscriptionName, messageId, ack);
             }
 
@@ -217,20 +206,17 @@ public class KahaDBTransactionStore impl
     }
 
     /**
+     * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
     public void prepare(TransactionId txid) throws IOException {
         KahaTransactionInfo info = getTransactionInfo(txid);
         if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
-            for (Journal journal : theStore.getJournalManager().getJournals()) {
-                theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
-            }
+            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
         } else {
             Tx tx = inflightTransactions.remove(txid);
             if (tx != null) {
-                for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
-                    theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
-                }
+               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
             }
         }
     }
@@ -262,7 +248,7 @@ public class KahaDBTransactionStore impl
                             theStore.brokerService.handleIOException(new IOException(e.getMessage()));
                         } catch (ExecutionException e) {
                             theStore.brokerService.handleIOException(new IOException(e.getMessage()));
-                        } catch (CancellationException e) {
+                        }catch(CancellationException e) {
                         }
                         if (!result.isCancelled()) {
                             doneSomething = true;
@@ -273,11 +259,9 @@ public class KahaDBTransactionStore impl
                     }
                     if (doneSomething) {
                         KahaTransactionInfo info = getTransactionInfo(txid);
-                        for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
-                            theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null);
-                        }
+                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
                     }
-                } else {
+                }else {
                     //The Tx will be null for failed over clients - lets run their post commits
                     if (postCommit != null) {
                         postCommit.run();
@@ -286,25 +270,22 @@ public class KahaDBTransactionStore impl
 
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
-                    for (Journal journal : theStore.getJournalManager().getJournals()) {
-                        theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
-                    }
-                    forgetRecoveredAcks(txid);
+                theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+                forgetRecoveredAcks(txid);
             }
-        } else {
-            LOG.error("Null transaction passed on commit");
+        }else {
+           LOG.error("Null transaction passed on commit");
         }
     }
 
     /**
+     * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
     public void rollback(TransactionId txid) throws IOException {
         if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
             KahaTransactionInfo info = getTransactionInfo(txid);
-            for (Journal journal : theStore.getJournalManager().getJournals()) {
-                theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
-            }
+            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
             forgetRecoveredAcks(txid);
         } else {
             inflightTransactions.remove(txid);
@@ -371,7 +352,6 @@ public class KahaDBTransactionStore impl
                     public Message getMessage() {
                         return message;
                     }
-
                     @Override
                     public Future<Object> run(ConnectionContext ctx) throws IOException {
                         destination.addMessage(ctx, message);
@@ -399,7 +379,6 @@ public class KahaDBTransactionStore impl
                     public Message getMessage() {
                         return message;
                     }
-
                     @Override
                     public Future<Object> run(ConnectionContext ctx) throws IOException {
                         return destination.asyncAddQueueMessage(ctx, message);
@@ -417,7 +396,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
                 destination.addMessage(context, message);
                 return AbstractMessageStore.FUTURE;
             } else {
@@ -427,7 +406,6 @@ public class KahaDBTransactionStore impl
                     public Message getMessage() {
                         return message;
                     }
-
                     @Override
                     public Future run(ConnectionContext ctx) throws IOException {
                         return destination.asyncAddTopicMessage(ctx, message);
@@ -449,7 +427,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
                 destination.removeMessage(context, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());
@@ -475,7 +453,7 @@ public class KahaDBTransactionStore impl
             throws IOException {
 
         if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
                 destination.removeAsyncMessage(context, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());
@@ -501,7 +479,7 @@ public class KahaDBTransactionStore impl
                            final MessageId messageId, final MessageAck ack) throws IOException {
 
         if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
+            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
                 destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
             } else {
                 Tx tx = getTx(ack.getTransactionId());
@@ -523,7 +501,6 @@ public class KahaDBTransactionStore impl
 
 
     private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
-        return theStore.createTransactionInfo(txid);
+        return theStore.getTransactionIdTransformer().transform(txid);
     }
-
 }