You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2011/07/15 15:45:03 UTC
svn commit: r1147149 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq/perf/
test/java/org/apache/activemq/usecases/
Author: rajdavies
Date: Fri Jul 15 13:45:01 2011
New Revision: 1147149
URL: http://svn.apache.org/viewvc?rev=1147149&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-2922
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java?rev=1147149&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DefaultJournalManager.java Fri Jul 15 13:45:01 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Journal;
+
+
+public class DefaultJournalManager implements JournalManager {
+
+ private final Journal journal;
+ private final List<Journal> journals;
+
+ public DefaultJournalManager() {
+ this.journal = new Journal();
+ List<Journal> list = new ArrayList<Journal>(1);
+ list.add(this.journal);
+ this.journals = Collections.unmodifiableList(list);
+ }
+
+ public void start() throws IOException {
+ journal.start();
+ }
+
+ public void close() throws IOException {
+ journal.close();
+ }
+
+ public Journal getJournal(ActiveMQDestination destination) {
+ return journal;
+ }
+
+ public void setDirectory(File directory) {
+ journal.setDirectory(directory);
+ }
+
+ public void setMaxFileLength(int maxFileLength) {
+ journal.setMaxFileLength(maxFileLength);
+ }
+
+ public void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles) {
+ journal.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
+ }
+
+ public void setChecksum(boolean checksum) {
+ journal.setChecksum(checksum);
+ }
+
+ public void setWriteBatchSize(int batchSize) {
+ journal.setWriteBatchSize(batchSize);
+ }
+
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ journal.setArchiveDataLogs(archiveDataLogs);
+ }
+
+ public void setStoreSize(AtomicLong storeSize) {
+ journal.setSizeAccumulator(storeSize);
+ }
+
+ public void setDirectoryArchive(File directoryArchive) {
+ journal.setDirectoryArchive(directoryArchive);
+ }
+
+ public void delete() throws IOException {
+ journal.delete();
+ }
+
+ public Map<Integer, DataFile> getFileMap() {
+ return journal.getFileMap();
+ }
+
+ public Collection<Journal> getJournals() {
+ return journals;
+ }
+
+ public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
+ return journals;
+ }
+}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java?rev=1147149&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/DestinationJournalManager.java Fri Jul 15 13:45:01 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.IOHelper;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Journal;
+
+public class DestinationJournalManager implements JournalManager {
+ private static final String PREPEND = "JournalDest-";
+ private static final String QUEUE_PREPEND = PREPEND + "Queue-";
+ private static final String TOPIC_PREPEND = PREPEND + "Topic-";
+ private AtomicBoolean started = new AtomicBoolean();
+ private final Map<ActiveMQDestination, Journal> journalMap = new ConcurrentHashMap<ActiveMQDestination, Journal>();
+ private File directory = new File("KahaDB");
+ private File directoryArchive;
+ private int maxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ private boolean checkForCorruptionOnStartup;
+ private boolean checksum = false;
+ private int writeBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+ private boolean archiveDataLogs;
+ private AtomicLong storeSize = new AtomicLong(0);
+
+
+ public AtomicBoolean getStarted() {
+ return started;
+ }
+
+ public void setStarted(AtomicBoolean started) {
+ this.started = started;
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public File getDirectoryArchive() {
+ return directoryArchive;
+ }
+
+ public void setDirectoryArchive(File directoryArchive) {
+ this.directoryArchive = directoryArchive;
+ }
+
+ public int getMaxFileLength() {
+ return maxFileLength;
+ }
+
+ public void setMaxFileLength(int maxFileLength) {
+ this.maxFileLength = maxFileLength;
+ }
+
+ public boolean isCheckForCorruptionOnStartup() {
+ return checkForCorruptionOnStartup;
+ }
+
+ public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
+ this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
+ }
+
+ public boolean isChecksum() {
+ return checksum;
+ }
+
+ public void setChecksum(boolean checksum) {
+ this.checksum = checksum;
+ }
+
+ public int getWriteBatchSize() {
+ return writeBatchSize;
+ }
+
+ public void setWriteBatchSize(int writeBatchSize) {
+ this.writeBatchSize = writeBatchSize;
+ }
+
+ public boolean isArchiveDataLogs() {
+ return archiveDataLogs;
+ }
+
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ this.archiveDataLogs = archiveDataLogs;
+ }
+
+ public AtomicLong getStoreSize() {
+ return storeSize;
+ }
+
+ public void setStoreSize(AtomicLong storeSize) {
+ this.storeSize = storeSize;
+ }
+
+
+ public void start() throws IOException {
+ if (started.compareAndSet(false, true)) {
+ File[] files = getDirectory().listFiles(new FilenameFilter() {
+ public boolean accept(File file, String s) {
+ if (file.isDirectory() && s != null && s.startsWith(PREPEND)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ if (files != null) {
+ for (File file : files) {
+ ActiveMQDestination destination;
+ if (file.getName().startsWith(TOPIC_PREPEND)) {
+ String destinationName = file.getName().substring(TOPIC_PREPEND.length());
+ destination = new ActiveMQTopic(destinationName);
+ } else {
+ String destinationName = file.getName().substring(QUEUE_PREPEND.length());
+ destination = new ActiveMQQueue(destinationName);
+ }
+
+ Journal journal = new Journal();
+ journal.setDirectory(file);
+ if (getDirectoryArchive() != null) {
+ IOHelper.mkdirs(getDirectoryArchive());
+ File archive = new File(getDirectoryArchive(), file.getName());
+ IOHelper.mkdirs(archive);
+ journal.setDirectoryArchive(archive);
+ }
+ configure(journal);
+ journalMap.put(destination, journal);
+ }
+ }
+ for (Journal journal : journalMap.values()) {
+ journal.start();
+ }
+ }
+
+ }
+
+ public void close() throws IOException {
+ started.set(false);
+ for (Journal journal : journalMap.values()) {
+ journal.close();
+ }
+ journalMap.clear();
+ }
+
+
+ public void delete() throws IOException {
+ for (Journal journal : journalMap.values()) {
+ journal.delete();
+ }
+ journalMap.clear();
+ }
+
+ public Journal getJournal(ActiveMQDestination destination) throws IOException {
+ Journal journal = journalMap.get(destination);
+ if (journal == null && !destination.isTemporary()) {
+ journal = new Journal();
+ String fileName;
+ if (destination.isTopic()) {
+ fileName = TOPIC_PREPEND + destination.getPhysicalName();
+ } else {
+ fileName = QUEUE_PREPEND + destination.getPhysicalName();
+ }
+ File file = new File(getDirectory(), fileName);
+ IOHelper.mkdirs(file);
+ journal.setDirectory(file);
+ if (getDirectoryArchive() != null) {
+ IOHelper.mkdirs(getDirectoryArchive());
+ File archive = new File(getDirectoryArchive(), fileName);
+ IOHelper.mkdirs(archive);
+ journal.setDirectoryArchive(archive);
+ }
+ configure(journal);
+ if (started.get()) {
+ journal.start();
+ }
+ return journal;
+ } else {
+ return journal;
+ }
+ }
+
+ public Map<Integer, DataFile> getFileMap() {
+ throw new RuntimeException("Not supported");
+ }
+
+ public Collection<Journal> getJournals() {
+ return journalMap.values();
+ }
+
+ public Collection<Journal> getJournals(Set<ActiveMQDestination> set) {
+ List<Journal> list = new ArrayList<Journal>();
+ for (ActiveMQDestination destination : set) {
+ Journal j = journalMap.get(destination);
+ if (j != null) {
+ list.add(j);
+ }
+ }
+ return list;
+ }
+
+ protected void configure(Journal journal) {
+ journal.setMaxFileLength(getMaxFileLength());
+ journal.setCheckForCorruptionOnStartup(isCheckForCorruptionOnStartup());
+ journal.setChecksum(isChecksum() || isCheckForCorruptionOnStartup());
+ journal.setWriteBatchSize(getWriteBatchSize());
+ journal.setArchiveDataLogs(isArchiveDataLogs());
+ journal.setSizeAccumulator(getStoreSize());
+ }
+}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java?rev=1147149&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/JournalManager.java Fri Jul 15 13:45:01 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Journal;
+
+public interface JournalManager {
+
+ void start() throws IOException;
+
+ void close() throws IOException;
+
+ Journal getJournal(ActiveMQDestination destination) throws IOException;
+
+ void setDirectory(File directory);
+
+ void setMaxFileLength(int maxFileLength);
+
+ void setCheckForCorruptionOnStartup(boolean checkForCorruptJournalFiles);
+
+ void setChecksum(boolean checksum);
+
+ void setWriteBatchSize(int batchSize);
+
+ void setArchiveDataLogs(boolean archiveDataLogs);
+
+ void setStoreSize(AtomicLong storeSize);
+
+ void setDirectoryArchive(File directoryArchive);
+
+ void delete() throws IOException;
+
+ Map<Integer, DataFile> getFileMap();
+
+ Collection<Journal> getJournals();
+
+ Collection<Journal> getJournals(Set<ActiveMQDestination> set);
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Jul 15 13:45:01 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb
import java.io.File;
import java.io.IOException;
import java.util.Set;
+
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -37,16 +38,11 @@ import org.apache.activemq.usage.SystemU
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
- *
- * @org.apache.xbean.XBean element="kahaDB"
- *
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private final KahaDBStore letter = new KahaDBStore();
/**
- * @param context
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void beginTransaction(ConnectionContext context) throws IOException {
@@ -54,8 +50,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param sync
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
*/
public void checkpoint(boolean sync) throws IOException {
@@ -63,8 +57,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param context
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void commitTransaction(ConnectionContext context) throws IOException {
@@ -72,9 +64,7 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param destination
* @return MessageStore
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
@@ -82,9 +72,7 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param destination
* @return TopicMessageStore
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
@@ -93,7 +81,6 @@ public class KahaDBPersistenceAdapter im
/**
* @return TrandactionStore
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
*/
public TransactionStore createTransactionStore() throws IOException {
@@ -101,7 +88,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
*/
public void deleteAllMessages() throws IOException {
@@ -118,7 +104,6 @@ public class KahaDBPersistenceAdapter im
/**
* @return lastMessageBrokerSequenceId
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
*/
public long getLastMessageBrokerSequenceId() throws IOException {
@@ -130,7 +115,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
@@ -138,7 +122,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
@@ -146,8 +129,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param context
- * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
*/
public void rollbackTransaction(ConnectionContext context) throws IOException {
@@ -155,7 +136,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param brokerName
* @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
*/
public void setBrokerName(String brokerName) {
@@ -163,7 +143,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
*/
public void setUsageManager(SystemUsage usageManager) {
@@ -179,7 +158,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception {
@@ -187,7 +165,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
@@ -196,7 +173,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxFileLength
- *
+ *
* @return the journalMaxFileLength
*/
public int getJournalMaxFileLength() {
@@ -206,8 +183,6 @@ public class KahaDBPersistenceAdapter im
/**
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
* be used
- *
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.letter.setJournalMaxFileLength(journalMaxFileLength);
@@ -219,7 +194,7 @@ public class KahaDBPersistenceAdapter im
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
}
-
+
public int getMaxFailoverProducersToTrack() {
return this.letter.getMaxFailoverProducersToTrack();
}
@@ -231,14 +206,14 @@ public class KahaDBPersistenceAdapter im
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
}
-
+
public int getFailoverProducersAuditDepth() {
return this.getFailoverProducersAuditDepth();
}
-
+
/**
* Get the checkpointInterval
- *
+ *
* @return the checkpointInterval
*/
public long getCheckpointInterval() {
@@ -247,9 +222,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the checkpointInterval
- *
- * @param checkpointInterval
- * the checkpointInterval to set
+ *
+ * @param checkpointInterval the checkpointInterval to set
*/
public void setCheckpointInterval(long checkpointInterval) {
this.letter.setCheckpointInterval(checkpointInterval);
@@ -257,7 +231,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the cleanupInterval
- *
+ *
* @return the cleanupInterval
*/
public long getCleanupInterval() {
@@ -266,9 +240,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the cleanupInterval
- *
- * @param cleanupInterval
- * the cleanupInterval to set
+ *
+ * @param cleanupInterval the cleanupInterval to set
*/
public void setCleanupInterval(long cleanupInterval) {
this.letter.setCleanupInterval(cleanupInterval);
@@ -276,7 +249,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the indexWriteBatchSize
- *
+ *
* @return the indexWriteBatchSize
*/
public int getIndexWriteBatchSize() {
@@ -286,9 +259,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexWriteBatchSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
- * @param indexWriteBatchSize
- * the indexWriteBatchSize to set
+ *
+ * @param indexWriteBatchSize the indexWriteBatchSize to set
*/
public void setIndexWriteBatchSize(int indexWriteBatchSize) {
this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
@@ -296,7 +268,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the journalMaxWriteBatchSize
- *
+ *
* @return the journalMaxWriteBatchSize
*/
public int getJournalMaxWriteBatchSize() {
@@ -305,10 +277,9 @@ public class KahaDBPersistenceAdapter im
/**
* Set the journalMaxWriteBatchSize
- * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
- * @param journalMaxWriteBatchSize
- * the journalMaxWriteBatchSize to set
+ * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
+ *
+ * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set
*/
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
@@ -316,7 +287,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the enableIndexWriteAsync
- *
+ *
* @return the enableIndexWriteAsync
*/
public boolean isEnableIndexWriteAsync() {
@@ -325,9 +296,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableIndexWriteAsync
- *
- * @param enableIndexWriteAsync
- * the enableIndexWriteAsync to set
+ *
+ * @param enableIndexWriteAsync the enableIndexWriteAsync to set
*/
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
@@ -335,7 +305,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the directory
- *
+ *
* @return the directory
*/
public File getDirectory() {
@@ -343,7 +313,6 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
*/
public void setDirectory(File dir) {
@@ -352,7 +321,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the enableJournalDiskSyncs
- *
+ *
* @return the enableJournalDiskSyncs
*/
public boolean isEnableJournalDiskSyncs() {
@@ -361,9 +330,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the enableJournalDiskSyncs
- *
- * @param enableJournalDiskSyncs
- * the enableJournalDiskSyncs to set
+ *
+ * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set
*/
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
@@ -371,7 +339,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the indexCacheSize
- *
+ *
* @return the indexCacheSize
*/
public int getIndexCacheSize() {
@@ -381,9 +349,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the indexCacheSize
* When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
- * @param indexCacheSize
- * the indexCacheSize to set
+ *
+ * @param indexCacheSize the indexCacheSize to set
*/
public void setIndexCacheSize(int indexCacheSize) {
this.letter.setIndexCacheSize(indexCacheSize);
@@ -391,7 +358,7 @@ public class KahaDBPersistenceAdapter im
/**
* Get the ignoreMissingJournalfiles
- *
+ *
* @return the ignoreMissingJournalfiles
*/
public boolean isIgnoreMissingJournalfiles() {
@@ -400,9 +367,8 @@ public class KahaDBPersistenceAdapter im
/**
* Set the ignoreMissingJournalfiles
- *
- * @param ignoreMissingJournalfiles
- * the ignoreMissingJournalfiles to set
+ *
+ * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set
*/
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
@@ -463,14 +429,14 @@ public class KahaDBPersistenceAdapter im
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
+
/**
- * @param maxAsyncJobs
- * the maxAsyncJobs to set
+ * @param maxAsyncJobs the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
letter.setMaxAsyncJobs(maxAsyncJobs);
}
-
+
/**
* @return the databaseLockedWaitDelay
*/
@@ -482,7 +448,7 @@ public class KahaDBPersistenceAdapter im
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
- letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
+ letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
}
public boolean getForceRecoverIndex() {
@@ -493,6 +459,14 @@ public class KahaDBPersistenceAdapter im
letter.setForceRecoverIndex(forceRecoverIndex);
}
+ public boolean isJournalPerDestination() {
+ return letter.isJournalPerDestination();
+ }
+
+ public void setJournalPerDestination(boolean journalPerDestination) {
+ letter.setJournalPerDestination(journalPerDestination);
+ }
+
// for testing
public KahaDBStore getStore() {
return letter;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul 15 13:45:01 2011
@@ -26,30 +26,24 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.command.*;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
@@ -58,23 +52,20 @@ import org.apache.activemq.store.TopicMe
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
@@ -85,7 +76,7 @@ public class KahaDBStore extends Message
PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
- PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
+ PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
@@ -128,8 +119,7 @@ public class KahaDBStore extends Message
}
/**
- * @param concurrentStoreAndDispatch
- * the concurrentStoreAndDispatch to set
+ * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
@@ -143,8 +133,7 @@ public class KahaDBStore extends Message
}
/**
- * @param concurrentStoreAndDispatch
- * the concurrentStoreAndDispatch to set
+ * @param concurrentStoreAndDispatch the concurrentStoreAndDispatch to set
*/
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
@@ -153,16 +142,16 @@ public class KahaDBStore extends Message
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
-
+
/**
* @return the maxAsyncJobs
*/
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
+
/**
- * @param maxAsyncJobs
- * the maxAsyncJobs to set
+ * @param maxAsyncJobs the maxAsyncJobs to set
*/
public void setMaxAsyncJobs(int maxAsyncJobs) {
this.maxAsyncJobs = maxAsyncJobs;
@@ -177,20 +166,20 @@ public class KahaDBStore extends Message
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
- thread.setDaemon(true);
- return thread;
- }
- });
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
- thread.setDaemon(true);
- return thread;
- }
- });
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
}
@Override
@@ -287,14 +276,16 @@ public class KahaDBStore extends Message
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
+ private final Journal journal;
double doneTasks, canceledTasks = 0;
- public KahaDBMessageStore(ActiveMQDestination destination) {
+ public KahaDBMessageStore(ActiveMQDestination destination) throws IOException {
super(destination);
this.dest = convert(destination);
this.maxAsyncJobs = getMaxAsyncJobs();
this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
+ this.journal = getJournalManager().getJournal(destination);
}
@Override
@@ -356,8 +347,8 @@ public class KahaDBStore extends Message
command.setPrioritySupported(isPrioritizedMessages());
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
-
+ store(journal, command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
+
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -368,13 +359,13 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
+ store(journal, command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
- store(command, true, null, null);
+ store(journal, command, true, null, null);
}
public Message getMessage(MessageId identity) throws IOException {
@@ -396,14 +387,14 @@ public class KahaDBStore extends Message
return sd.orderIndex.get(tx, sequence).location;
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
if (location == null) {
return null;
}
- return loadMessage(location);
+ return loadMessage(journal, location);
}
public int getMessageCount() throws IOException {
@@ -419,14 +410,14 @@ public class KahaDBStore extends Message
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
- .hasNext();) {
+ .hasNext(); ) {
iterator.next();
rc++;
}
return rc;
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
} finally {
@@ -446,7 +437,7 @@ public class KahaDBStore extends Message
return sd.locationIndex.isEmpty(tx);
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
}
@@ -460,22 +451,22 @@ public class KahaDBStore extends Message
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
- .hasNext();) {
+ .hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
- Message msg = loadMessage(entry.getValue().location);
+ Message msg = loadMessage(journal, entry.getValue().location);
listener.recoverMessage(msg);
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
-
+
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.readLock().lock();
try {
@@ -490,7 +481,7 @@ public class KahaDBStore extends Message
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
- Message msg = loadMessage(entry.getValue().location);
+ Message msg = loadMessage(journal, entry.getValue().location);
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned) {
@@ -500,7 +491,7 @@ public class KahaDBStore extends Message
sd.orderIndex.stoppedIterating();
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
}
@@ -511,11 +502,12 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getExistingStoredDestination(dest, tx);
if (sd != null) {
- sd.orderIndex.resetCursorPosition();}
+ sd.orderIndex.resetCursorPosition();
}
- });
+ }
+ });
} catch (Exception e) {
- LOG.error("Failed to reset batching",e);
+ LOG.error("Failed to reset batching", e);
}
}
@@ -528,10 +520,10 @@ public class KahaDBStore extends Message
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
// externally synchronize...
-
+
indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long location = sd.messageIdIndex.get(tx, key);
@@ -540,10 +532,10 @@ public class KahaDBStore extends Message
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
-
+
} finally {
unlockAsyncJobQueue();
}
@@ -553,15 +545,21 @@ public class KahaDBStore extends Message
@Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
+
@Override
public void start() throws Exception {
super.start();
}
+
@Override
public void stop() throws Exception {
super.stop();
}
+ public Journal getJournal() {
+ return this.journal;
+ }
+
protected void lockAsyncJobQueue() {
try {
this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@@ -590,6 +588,7 @@ public class KahaDBStore extends Message
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
private final AtomicInteger subscriptionCount = new AtomicInteger();
+
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
@@ -646,7 +645,7 @@ public class KahaDBStore extends Message
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
}
- store(command, false, null, null);
+ store(getJournal(), command, false, null, null);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -658,7 +657,7 @@ public class KahaDBStore extends Message
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
- store(command, isEnableJournalDiskSyncs() && true, null, null);
+ store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.incrementAndGet();
}
@@ -666,7 +665,7 @@ public class KahaDBStore extends Message
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
- store(command, isEnableJournalDiskSyncs() && true, null, null);
+ store(getJournal(), command, isEnableJournalDiskSyncs() && true, null, null);
this.subscriptionCount.decrementAndGet();
}
@@ -679,7 +678,7 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
- .hasNext();) {
+ .hasNext(); ) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
.getValue().getSubscriptionInfo().newInput()));
@@ -688,7 +687,7 @@ public class KahaDBStore extends Message
}
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
@@ -712,7 +711,7 @@ public class KahaDBStore extends Message
.getSubscriptionInfo().newInput()));
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
}
@@ -732,7 +731,7 @@ public class KahaDBStore extends Message
int counter = 0;
for (Iterator<Entry<Long, HashSet<String>>> iterator =
- sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
+ sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext(); ) {
Entry<Long, HashSet<String>> entry = iterator.next();
if (entry.getValue().contains(subscriptionKey)) {
counter++;
@@ -741,7 +740,7 @@ public class KahaDBStore extends Message
return counter;
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -758,20 +757,20 @@ public class KahaDBStore extends Message
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
- .hasNext();) {
+ .hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
- listener.recoverMessage(loadMessage(entry.getValue().location));
+ listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location));
}
sd.orderIndex.resetCursorPosition();
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
- final MessageRecoveryListener listener) throws Exception {
+ final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
indexLock.writeLock().lock();
@@ -796,9 +795,9 @@ public class KahaDBStore extends Message
Entry<Long, MessageKeys> entry = null;
int counter = 0;
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
- .hasNext();) {
+ .hasNext(); ) {
entry = iterator.next();
- if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
+ if (listener.recoverMessage(loadMessage(getJournal(), entry.getValue().location))) {
counter++;
}
if (counter >= maxReturned || listener.hasSpace() == false) {
@@ -812,7 +811,7 @@ public class KahaDBStore extends Message
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -828,7 +827,7 @@ public class KahaDBStore extends Message
sd.subscriptionCursors.remove(subscriptionKey);
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
} catch (IOException e) {
@@ -852,9 +851,8 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
- *
- * @param destination
- * Destination to forget
+ *
+ * @param destination Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
@@ -862,9 +860,8 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
- *
- * @param destination
- * Destination to forget
+ *
+ * @param destination Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
@@ -881,7 +878,7 @@ public class KahaDBStore extends Message
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
- .hasNext();) {
+ .hasNext(); ) {
Entry<String, StoredDestination> entry = iterator.next();
if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey()));
@@ -902,7 +899,7 @@ public class KahaDBStore extends Message
return isEmptyTopic;
}
});
- }finally {
+ } finally {
indexLock.readLock().unlock();
}
return rc;
@@ -914,7 +911,7 @@ public class KahaDBStore extends Message
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
+
public long getLastProducerSequenceId(ProducerId id) {
indexLock.readLock().lock();
try {
@@ -931,9 +928,11 @@ public class KahaDBStore extends Message
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
+
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
+
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
@@ -951,8 +950,8 @@ public class KahaDBStore extends Message
* @return
* @throws IOException
*/
- Message loadMessage(Location location) throws IOException {
- KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
+ Message loadMessage(Journal journal, Location location) throws IOException {
+ KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(journal, location);
Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg;
}
@@ -972,20 +971,20 @@ public class KahaDBStore extends Message
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
switch (dest.getDestinationType()) {
- case ActiveMQDestination.QUEUE_TYPE:
- rc.setType(DestinationType.QUEUE);
- return rc;
- case ActiveMQDestination.TOPIC_TYPE:
- rc.setType(DestinationType.TOPIC);
- return rc;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- rc.setType(DestinationType.TEMP_QUEUE);
- return rc;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- rc.setType(DestinationType.TEMP_TOPIC);
- return rc;
- default:
- return null;
+ case ActiveMQDestination.QUEUE_TYPE:
+ rc.setType(DestinationType.QUEUE);
+ return rc;
+ case ActiveMQDestination.TOPIC_TYPE:
+ rc.setType(DestinationType.TOPIC);
+ return rc;
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ rc.setType(DestinationType.TEMP_QUEUE);
+ return rc;
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ rc.setType(DestinationType.TEMP_TOPIC);
+ return rc;
+ default:
+ return null;
}
}
@@ -998,16 +997,16 @@ public class KahaDBStore extends Message
String name = dest.substring(p + 1);
switch (KahaDestination.DestinationType.valueOf(type)) {
- case QUEUE:
- return new ActiveMQQueue(name);
- case TOPIC:
- return new ActiveMQTopic(name);
- case TEMP_QUEUE:
- return new ActiveMQTempQueue(name);
- case TEMP_TOPIC:
- return new ActiveMQTempTopic(name);
- default:
- throw new IllegalArgumentException("Not in the valid destination format");
+ case QUEUE:
+ return new ActiveMQQueue(name);
+ case TOPIC:
+ return new ActiveMQTopic(name);
+ case TEMP_QUEUE:
+ return new ActiveMQTempQueue(name);
+ case TEMP_TOPIC:
+ return new ActiveMQTempTopic(name);
+ default:
+ throw new IllegalArgumentException("Not in the valid destination format");
}
}
@@ -1137,8 +1136,9 @@ public class KahaDBStore extends Message
private final int subscriptionCount;
private final List<String> subscriptionKeys = new ArrayList<String>(1);
private final KahaDBTopicMessageStore topicStore;
+
public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
- int subscriptionCount) {
+ int subscriptionCount) {
super(store, context, message);
this.topicStore = store;
this.subscriptionCount = subscriptionCount;
@@ -1170,8 +1170,7 @@ public class KahaDBStore extends Message
/**
* add a key
- *
- * @param key
+ *
* @return true if all acknowledgements received
*/
public boolean addSubscriptionKey(String key) {
@@ -1217,7 +1216,7 @@ public class KahaDBStore extends Message
super.afterExecute(runnable, throwable);
if (runnable instanceof StoreTask) {
- ((StoreTask)runnable).releaseLocks();
+ ((StoreTask) runnable).releaseLocks();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1147149&r1=1147148&r2=1147149&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Fri Jul 15 13:45:01 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -26,7 +27,9 @@ import java.util.concurrent.Cancellation
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@@ -49,14 +52,13 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.journal.Journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
- *
- *
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
@@ -70,21 +72,23 @@ public class KahaDBTransactionStore impl
public class Tx {
private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
-
private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+ private final HashSet<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>();
public void add(AddMessageCommand msg) {
messages.add(msg);
+ destinations.add(msg.getMessage().getDestination());
}
public void add(RemoveMessageCommand ack) {
acks.add(ack);
+ destinations.add(ack.getMessageAck().getDestination());
}
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count = 0;
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
AddMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessage();
}
@@ -94,7 +98,7 @@ public class KahaDBTransactionStore impl
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count = 0;
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
RemoveMessageCommand cmd = iter.next();
rc[count++] = cmd.getMessageAck();
}
@@ -103,49 +107,56 @@ public class KahaDBTransactionStore impl
/**
* @return true if something to commit
- * @throws IOException
*/
public List<Future<Object>> commit() throws IOException {
List<Future<Object>> results = new ArrayList<Future<Object>>();
// Do all the message adds.
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+ for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext(); ) {
AddMessageCommand cmd = iter.next();
results.add(cmd.run());
}
// And removes..
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext(); ) {
RemoveMessageCommand cmd = iter.next();
cmd.run();
results.add(cmd.run());
}
-
+
return results;
}
}
public abstract class AddMessageCommand {
private final ConnectionContext ctx;
+
AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
+
abstract Message getMessage();
+
Future<Object> run() throws IOException {
return run(this.ctx);
}
+
abstract Future<Object> run(ConnectionContext ctx) throws IOException;
}
public abstract class RemoveMessageCommand {
private final ConnectionContext ctx;
+
RemoveMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
}
+
abstract MessageAck getMessageAck();
+
Future<Object> run() throws IOException {
return run(this.ctx);
}
+
abstract Future<Object> run(ConnectionContext context) throws IOException;
}
@@ -197,8 +208,8 @@ public class KahaDBTransactionStore impl
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack) throws IOException {
- KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
+ MessageId messageId, MessageAck ack) throws IOException {
+ KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore) getDelegate(), clientId,
subscriptionName, messageId, ack);
}
@@ -206,13 +217,17 @@ public class KahaDBTransactionStore impl
}
/**
- * @throws IOException
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) throws IOException {
inflightTransactions.remove(txid);
KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
+ Tx tx = inflightTransactions.get(txid);
+ if (tx != null) {
+ for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
+ theStore.store(journal, new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
+ }
+ }
}
public Tx getTx(Object txid) {
@@ -242,7 +257,7 @@ public class KahaDBTransactionStore impl
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
} catch (ExecutionException e) {
theStore.brokerService.handleIOException(new IOException(e.getMessage()));
- }catch(CancellationException e) {
+ } catch (CancellationException e) {
}
if (!result.isCancelled()) {
doneSomething = true;
@@ -253,9 +268,11 @@ public class KahaDBTransactionStore impl
}
if (doneSomething) {
KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
+ for (Journal journal : theStore.getJournalManager().getJournals(tx.destinations)) {
+ theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, null, null);
+ }
}
- }else {
+ } else {
//The Tx will be null for failed over clients - lets run their post commits
if (postCommit != null) {
postCommit.run();
@@ -266,23 +283,26 @@ public class KahaDBTransactionStore impl
KahaTransactionInfo info = getTransactionInfo(txid);
// ensure message order w.r.t to cursor and store for setBatch()
synchronized (this) {
- theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ for (Journal journal : theStore.getJournalManager().getJournals()) {
+ theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ }
forgetRecoveredAcks(txid);
}
}
- }else {
- LOG.error("Null transaction passed on commit");
+ } else {
+ LOG.error("Null transaction passed on commit");
}
}
/**
- * @throws IOException
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) throws IOException {
if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
+ for (Journal journal : theStore.getJournalManager().getJournals()) {
+ theStore.store(journal, new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
+ }
forgetRecoveredAcks(txid);
} else {
inflightTransactions.remove(txid);
@@ -349,6 +369,7 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
+
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
@@ -376,6 +397,7 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
+
@Override
public Future<Object> run(ConnectionContext ctx) throws IOException {
return destination.asyncAddQueueMessage(ctx, message);
@@ -393,7 +415,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
+ if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.addMessage(context, message);
return AbstractMessageStore.FUTURE;
} else {
@@ -403,6 +425,7 @@ public class KahaDBTransactionStore impl
public Message getMessage() {
return message;
}
+
@Override
public Future run(ConnectionContext ctx) throws IOException {
return destination.asyncAddTopicMessage(ctx, message);
@@ -424,7 +447,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -450,7 +473,7 @@ public class KahaDBTransactionStore impl
throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
@@ -476,7 +499,7 @@ public class KahaDBTransactionStore impl
final MessageId messageId, final MessageAck ack) throws IOException {
if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
+ if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
} else {
Tx tx = getTx(ack.getTransactionId());