You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/05/17 13:53:28 UTC
svn commit: r945102 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/ store/ store/kahadb/
Author: rajdavies
Date: Mon May 17 11:53:28 2010
New Revision: 945102
URL: http://svn.apache.org/viewvc?rev=945102&view=rev
Log:
added support for concurrent dispatch and store of persistent messages in KahaDB
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon May 17 11:53:28 2010
@@ -29,18 +29,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -105,13 +105,13 @@ public class Queue extends BaseDestinati
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
- private QueueDispatchSelector dispatchSelector;
+ private final QueueDispatchSelector dispatchSelector;
private boolean optimizedDispatch = false;
private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch;
- private AtomicLong pendingWakeups = new AtomicLong();
+ private final AtomicLong pendingWakeups = new AtomicLong();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
@@ -163,6 +163,7 @@ public class Queue extends BaseDestinati
class FlowControlTimeoutTask extends Thread {
+ @Override
public void run() {
TimeoutMessage timeout;
try {
@@ -220,6 +221,7 @@ public class Queue extends BaseDestinati
}
}
+ @Override
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
@@ -554,6 +556,7 @@ public class Queue extends BaseDestinati
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
+ Future<Object> result = null;
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@@ -568,8 +571,11 @@ public class Queue extends BaseDestinati
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
- store.addMessage(context, message);
-
+ if (context.isInTransaction()) {
+ store.addMessage(context, message);
+ }else {
+ result = store.asyncAddQueueMessage(context, message);
+ }
}
}
if (context.isInTransaction()) {
@@ -578,6 +584,7 @@ public class Queue extends BaseDestinati
// our memory. This increment is decremented once the tx finishes..
message.incrementReferenceCount();
context.getTransaction().addSynchronization(new Synchronization() {
+ @Override
public void afterCommit() throws Exception {
try {
// It could take while before we receive the commit
@@ -603,6 +610,14 @@ public class Queue extends BaseDestinati
// usage manager.
sendMessage(context, message);
}
+ if (result != null && !result.isCancelled()) {
+ try {
+ result.get();
+ }catch(CancellationException e) {
+ //ignore - the task has been cancelled if the message
+ // has already been deleted
+ }
+ }
}
private void expireMessages() {
@@ -651,7 +666,7 @@ public class Queue extends BaseDestinati
ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1);
}
- store.removeMessage(context, ack);
+ store.removeAsyncMessage(context, ack);
}
}
@@ -666,6 +681,7 @@ public class Queue extends BaseDestinati
return msg;
}
+ @Override
public String toString() {
int size = 0;
synchronized (messages) {
@@ -725,6 +741,7 @@ public class Queue extends BaseDestinati
// Properties
// -------------------------------------------------------------------------
+ @Override
public ActiveMQDestination getActiveMQDestination() {
return destination;
}
@@ -936,7 +953,7 @@ public class Queue extends BaseDestinati
for (MessageReference ref : list) {
try {
QueueMessageReference r = (QueueMessageReference) ref;
- removeMessage(c, (IndirectMessageReference) r);
+ removeMessage(c, r);
} catch (IOException e) {
}
}
@@ -1273,6 +1290,7 @@ public class Queue extends BaseDestinati
return messageId.equals(r.getMessageId().toString());
}
+ @Override
public String toString() {
return "MessageIdFilter: " + messageId;
}
@@ -1326,12 +1344,14 @@ public class Queue extends BaseDestinati
} finally {
context.getTransaction().addSynchronization(new Synchronization() {
+ @Override
public void afterCommit() throws Exception {
getDestinationStatistics().getDequeues().increment();
dropMessage(reference);
wakeup();
}
+ @Override
public void afterRollback() throws Exception {
reference.setAcked(false);
}
@@ -1634,6 +1654,7 @@ public class Queue extends BaseDestinati
* org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
* (org.apache.activemq.command.MessageDispatchNotification)
*/
+ @Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
// do dispatch
Subscription sub = getMatchingSubscription(messageDispatchNotification);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon May 17 11:53:28 2010
@@ -21,10 +21,11 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.Future;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -87,6 +88,7 @@ public class Topic extends BaseDestinati
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
+ @Override
public void initialize() throws Exception {
super.initialize();
if (store != null) {
@@ -402,6 +404,7 @@ public class Topic extends BaseDestinati
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+ Future<Object> result = null;
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
@@ -413,13 +416,18 @@ public class Topic extends BaseDestinati
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
- topicStore.addMessage(context, message);
+ if (context.isInTransaction()) {
+ topicStore.addMessage(context, message);
+ }else {
+ result = topicStore.asyncAddTopicMessage(context, message);
+ }
}
message.incrementReferenceCount();
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
+ @Override
public void afterCommit() throws Exception {
// It could take while before we receive the commit
// operration.. by that time the message could have
@@ -445,6 +453,14 @@ public class Topic extends BaseDestinati
message.decrementReferenceCount();
}
}
+ if (result != null && !result.isCancelled()) {
+ try {
+ result.get();
+ }catch(CancellationException e) {
+ //ignore - the task has been cancelled if the message
+ // has already been deleted
+ }
+ }
}
@@ -452,6 +468,7 @@ public class Topic extends BaseDestinati
return durableSubcribers.size() == 0;
}
+ @Override
public String toString() {
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Mon May 17 11:53:28 2010
@@ -17,18 +17,24 @@
package org.apache.activemq.store;
import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
+ static final FutureTask<Object> FUTURE;
protected final ActiveMQDestination destination;
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
}
-
+
public void dispose(ConnectionContext context) {
}
@@ -44,16 +50,43 @@ abstract public class AbstractMessageSto
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
-
+
public void setBatch(MessageId messageId) throws IOException, Exception {
}
-
+
/**
* flag to indicate if the store is empty
+ *
* @return true if the message count is 0
- * @throws Exception
+ * @throws Exception
*/
- public boolean isEmpty() throws Exception{
- return getMessageCount()==0;
- }
+ public boolean isEmpty() throws Exception {
+ return getMessageCount() == 0;
+ }
+
+ public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
+ addMessage(context, message);
+ return FUTURE;
+ }
+
+
+ public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
+ addMessage(context, message);
+ return FUTURE;
+ }
+
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ removeMessage(context, ack);
+ }
+
+ static class CallableImplementation implements Callable<Object> {
+ public Object call() throws Exception {
+ return null;
+ }
+ }
+
+ static {
+ FUTURE = new FutureTask<Object>(new CallableImplementation());
+ FUTURE.run();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -40,6 +41,30 @@ public interface MessageStore extends Se
* @throws IOException
*/
void addMessage(ConnectionContext context, Message message) throws IOException;
+
+ /**
+ * Adds a message to the message store
+ *
+ * @param context context
+ * @param message
+ * @param l
+ * @return a Future to track when this is complete
+ * @throws IOException
+ * @throws IOException
+ */
+ Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException;
+
+ /**
+ * Adds a message to the message store
+ *
+ * @param context context
+ * @param message
+ * @param l
+ * @return a Future to track when this is complete
+ * @throws IOException
+ * @throws IOException
+ */
+ Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException;
/**
* Looks up a message using either the String messageID or the
@@ -62,6 +87,8 @@ public interface MessageStore extends Se
* @throws IOException
*/
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
+
+ void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException;
/**
* Removes all the messages from the message store.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -100,4 +101,16 @@ public class ProxyMessageStore implement
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
+
+ public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ return delegate.asyncAddQueueMessage(context, message);
+ }
+
+ public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ return delegate.asyncAddTopicMessage(context, message);
+ }
+
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ delegate.removeAsyncMessage(context, ack);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon May 17 11:53:28 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -140,4 +141,16 @@ public class ProxyTopicMessageStore impl
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
+
+ public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+ return delegate.asyncAddTopicMessage(context, message);
+ }
+
+ public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+ return delegate.asyncAddQueueMessage(context, message);
+ }
+
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ delegate.removeAsyncMessage(context, ack);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon May 17 11:53:28 2010
@@ -17,9 +17,7 @@
package org.apache.activemq.store;
import java.io.IOException;
-
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -42,7 +40,7 @@ public interface TopicMessageStore exten
* @throws IOException
*/
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
-
+
/**
* @param clientId
* @param subscriptionName
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Mon May 17 11:53:28 2010
@@ -386,4 +386,35 @@ public class KahaDBPersistenceAdapter im
public void setDirectoryArchive(File directoryArchive) {
letter.setDirectoryArchive(directoryArchive);
}
+
+ public boolean isConcurrentStoreAndDispatchQueues() {
+ return letter.isConcurrentStoreAndDispatchQueues();
+ }
+
+ public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
+ letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
+ }
+
+ public boolean isConcurrentStoreAndDispatchTopics() {
+ return letter.isConcurrentStoreAndDispatchTopics();
+ }
+
+ public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
+ letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
+ }
+
+ public int getMaxAsyncJobs() {
+ return letter.getMaxAsyncJobs();
+ }
+ /**
+ * @param maxAsyncJobs the maxAsyncJobs to set
+ */
+ public void setMaxAsyncJobs(int maxAsyncJobs) {
+ letter.setMaxAsyncJobs(maxAsyncJobs);
+ }
+
+ @Override
+ public String toString() {
+ return "KahaDBPersistenceAdapter";
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon May 17 11:53:28 2010
@@ -18,12 +18,25 @@ package org.apache.activemq.store.kahadb
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -46,7 +59,6 @@ import org.apache.activemq.store.Persist
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -62,23 +74,167 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
-
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
-
+ private static final int MAX_ASYNC_JOBS = 10000;
+ protected ExecutorService queueExecutor;
+ protected ExecutorService topicExecutor;
+ protected final Map<MessageId, StoreQueueTask> asyncQueueMap = new HashMap<MessageId, StoreQueueTask>();
+ protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
private final WireFormat wireFormat = new OpenWireFormat();
+ private SystemUsage usageManager;
+ private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
+ private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
+ private Semaphore queueSemaphore;
+ private Semaphore topicSemaphore;
+ private boolean concurrentStoreAndDispatchQueues = true;
+ private boolean concurrentStoreAndDispatchTopics = true;
+ private int maxAsyncJobs = MAX_ASYNC_JOBS;
+
+ public KahaDBStore() {
+ }
public void setBrokerName(String brokerName) {
}
+
public void setUsageManager(SystemUsage usageManager) {
+ this.usageManager = usageManager;
+ }
+
+ public SystemUsage getUsageManager() {
+ return this.usageManager;
+ }
+
+ /**
+ * @return the concurrentStoreAndDispatch
+ */
+ public boolean isConcurrentStoreAndDispatchQueues() {
+ return this.concurrentStoreAndDispatchQueues;
+ }
+
+ /**
+ * @param concurrentStoreAndDispatch
+ * the concurrentStoreAndDispatch to set
+ */
+ public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
+ this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
+ }
+
+ /**
+ * @return the concurrentStoreAndDispatch
+ */
+ public boolean isConcurrentStoreAndDispatchTopics() {
+ return this.concurrentStoreAndDispatchTopics;
+ }
+
+ /**
+ * @param concurrentStoreAndDispatch
+ * the concurrentStoreAndDispatch to set
+ */
+ public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
+ this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
+ }
+
+ /**
+ * @return the maxAsyncJobs
+ */
+ public int getMaxAsyncJobs() {
+ return this.maxAsyncJobs;
+ }
+ /**
+ * @param maxAsyncJobs
+ * the maxAsyncJobs to set
+ */
+ public void setMaxAsyncJobs(int maxAsyncJobs) {
+ this.maxAsyncJobs = maxAsyncJobs;
+ }
+
+ @Override
+ public void doStart() throws Exception {
+ this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
+ this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
+ this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
+ this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
+ this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
+ new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
+ new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ super.doStart();
+
+ }
+
+ @Override
+ public void doStop(ServiceStopper stopper) throws Exception {
+ this.queueSemaphore.drainPermits();
+ this.topicSemaphore.drainPermits();
+ if (this.queueExecutor != null) {
+ this.queueExecutor.shutdownNow();
+ }
+ if (this.topicExecutor != null) {
+ this.topicExecutor.shutdownNow();
+ }
+ super.doStop(stopper);
+ }
+
+ protected StoreQueueTask removeQueueTask(MessageId id) {
+ StoreQueueTask task = this.asyncQueueMap.remove(id);
+ if (task != null) {
+ task.getMessage().decrementReferenceCount();
+ this.queueSemaphore.release();
+ }
+ return task;
+ }
+
+ protected void addQueueTask(StoreQueueTask task) throws IOException {
+ try {
+ this.queueSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ }
+ this.asyncQueueMap.put(task.getMessage().getMessageId(), task);
+ task.getMessage().incrementReferenceCount();
+ this.queueExecutor.execute(task);
+ }
+
+ protected StoreTopicTask removeTopicTask(MessageId id) {
+ StoreTopicTask task = this.asyncTopicMap.remove(id);
+ if (task != null) {
+ task.getMessage().decrementReferenceCount();
+ this.topicSemaphore.release();
+ }
+ return task;
+ }
+
+ protected void addTopicTask(StoreTopicTask task) throws IOException {
+ try {
+ this.topicSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ }
+ this.asyncTopicMap.put(task.getMessage().getMessageId(), task);
+ task.getMessage().incrementReferenceCount();
+ this.topicExecutor.execute(task);
}
public TransactionStore createTransactionStore() throws IOException {
- return new TransactionStore(){
-
+ return new TransactionStore() {
+
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true);
}
@@ -90,22 +246,24 @@ public class KahaDBStore extends Message
}
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
- XATransactionId xid = (XATransactionId)entry.getKey();
+ XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
+
for (Operation op : entry.getValue()) {
- if( op.getClass() == AddOpperation.class ) {
- AddOpperation addOp = (AddOpperation)op;
- Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
+ if (op.getClass() == AddOpperation.class) {
+ AddOpperation addOp = (AddOpperation) op;
+ Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand()
+ .getMessage().newInput()));
messageList.add(msg);
} else {
- RemoveOpperation rmOp = (RemoveOpperation)op;
- MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
+ RemoveOpperation rmOp = (RemoveOpperation) op;
+ MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(rmOp.getCommand()
+ .getAck().newInput()));
ackList.add(ack);
}
}
-
+
Message[] addedMessages = new Message[messageList.size()];
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
@@ -125,7 +283,7 @@ public class KahaDBStore extends Message
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
- this.dest = convert( destination );
+ this.dest = convert(destination);
}
@Override
@@ -133,24 +291,52 @@ public class KahaDBStore extends Message
return destination;
}
+ @Override
+ public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
+ throws IOException {
+ if (isConcurrentStoreAndDispatchQueues()) {
+ StoreQueueTask result = new StoreQueueTask(this, context, message);
+ addQueueTask(result);
+ return result.getFuture();
+ } else {
+ return super.asyncAddQueueMessage(context, message);
+ }
+ }
+
+ @Override
+ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ if (isConcurrentStoreAndDispatchQueues()) {
+ StoreQueueTask task = removeQueueTask(ack.getLastMessageId());
+ if (task != null) {
+ if (!task.cancel()) {
+ removeMessage(context, ack);
+ }
+ } else {
+ removeMessage(context, ack);
+ }
+ } else {
+ removeMessage(context, ack);
+ }
+ }
+
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toString());
- command.setTransactionInfo( createTransactionInfo(message.getTransactionId()) );
+ command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired());
-
+
}
-
+
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
- command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
+ command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired());
}
@@ -162,37 +348,40 @@ public class KahaDBStore extends Message
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toString();
-
- // Hopefully one day the page file supports concurrent read operations... but for now we must
+
+ // Hopefully one day the page file supports concurrent read
+ // operations... but for now we must
// externally synchronize...
Location location;
- synchronized(indexMutex) {
- location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>(){
+ synchronized (indexMutex) {
+ location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
- if( sequence ==null ) {
+ if (sequence == null) {
return null;
}
return sd.orderIndex.get(tx, sequence).location;
}
});
}
- if( location == null ) {
+ if (location == null) {
return null;
}
-
+
return loadMessage(location);
}
-
+
public int getMessageCount() throws IOException {
- synchronized(indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+ synchronized (indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
- // Iterate through all index entries to get a count of messages in the destination.
+ // Iterate through all index entries to get a count of
+ // messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
- int rc=0;
- for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+ int rc = 0;
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+ .hasNext();) {
iterator.next();
rc++;
}
@@ -201,12 +390,14 @@ public class KahaDBStore extends Message
});
}
}
-
+
+ @Override
public boolean isEmpty() throws IOException {
- synchronized(indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
+ synchronized (indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
public Boolean execute(Transaction tx) throws IOException {
- // Iterate through all index entries to get a count of messages in the destination.
+ // Iterate through all index entries to get a count of
+ // messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
return sd.locationIndex.isEmpty(tx);
}
@@ -214,40 +405,41 @@ public class KahaDBStore extends Message
}
}
-
public void recover(final MessageRecoveryListener listener) throws Exception {
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
+ .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getValue().location) );
+ listener.recoverMessage(loadMessage(entry.getValue().location));
}
}
});
}
}
- long cursorPos=0;
-
+ long cursorPos = 0;
+
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- Entry<Long, MessageKeys> entry=null;
+ Entry<Long, MessageKeys> entry = null;
int counter = 0;
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ .hasNext();) {
entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getValue().location ) );
+ listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
- if( counter >= maxReturned ) {
+ if (counter >= maxReturned) {
break;
}
}
- if( entry!=null ) {
- cursorPos = entry.getKey()+1;
+ if (entry != null) {
+ cursorPos = entry.getKey() + 1;
}
}
});
@@ -255,29 +447,29 @@ public class KahaDBStore extends Message
}
public void resetBatching() {
- cursorPos=0;
+ cursorPos = 0;
}
-
@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toString();
-
- // Hopefully one day the page file supports concurrent read operations... but for now we must
+
+ // Hopefully one day the page file supports concurrent read
+ // operations... but for now we must
// externally synchronize...
Long location;
- synchronized(indexMutex) {
- location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
+ synchronized (indexMutex) {
+ location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
}
});
}
- if( location!=null ) {
- cursorPos=location+1;
+ if (location != null) {
+ cursorPos = location + 1;
}
-
+
}
@Override
@@ -285,32 +477,65 @@ public class KahaDBStore extends Message
}
@Override
public void start() throws Exception {
+ super.start();
}
@Override
public void stop() throws Exception {
+ super.stop();
}
-
+
}
-
+
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
- public KahaDBTopicMessageStore(ActiveMQTopic destination) {
+ private final AtomicInteger subscriptionCount = new AtomicInteger();
+ public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
+ this.subscriptionCount.set(getAllSubscriptions().length);
+ }
+
+ @Override
+ public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
+ throws IOException {
+ if (isConcurrentStoreAndDispatchTopics()) {
+ StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
+ addTopicTask(result);
+ return result.getFuture();
+ } else {
+ return super.asyncAddTopicMessage(context, message);
+ }
}
-
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
+ throws IOException {
+ String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+ if (isConcurrentStoreAndDispatchTopics()) {
+ StoreTopicTask task = asyncTopicMap.get(messageId);
+ if (task != null) {
+
+ if (task.addSubscriptionKey(subscriptionKey)) {
+ removeTopicTask(messageId);
+ task.cancel();
+ }
+ } else {
+ doAcknowledge(context, subscriptionKey, messageId);
+ }
+ } else {
+ doAcknowledge(context, subscriptionKey, messageId);
+ }
+ }
+
+ protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId)
+ throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
- command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+ command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
- // We are not passed a transaction info.. so we can't participate in a transaction.
- // Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack
- // to pass back to the XA recover method.
- // command.setTransactionInfo();
store(command, false);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
- String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
+ String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
+ .getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
@@ -318,6 +543,7 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && true);
+ this.subscriptionCount.incrementAndGet();
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
@@ -325,111 +551,120 @@ public class KahaDBStore extends Message
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
store(command, isEnableJournalDiskSyncs() && true);
+ this.subscriptionCount.decrementAndGet();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-
+
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
- for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+ for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
+ .hasNext();) {
Entry<String, KahaSubscriptionCommand> entry = iterator.next();
- SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+ SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
+ .getValue().getSubscriptionInfo().newInput()));
subscriptions.add(info);
}
}
});
}
-
- SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+
+ SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
subscriptions.toArray(rc);
return rc;
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized(indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+ synchronized (indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
- if( command ==null ) {
+ if (command == null) {
return null;
}
- return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
+ return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
+ .getSubscriptionInfo().newInput()));
}
});
}
}
-
+
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized(indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+ synchronized (indexMutex) {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
- if ( cursorPos==null ) {
+ if (cursorPos == null) {
// The subscription might not exist.
return 0;
}
cursorPos += 1;
-
+
int counter = 0;
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ .hasNext();) {
iterator.next();
counter++;
}
return counter;
}
});
- }
+ }
}
- public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+ public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
+ throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
-
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ .hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getValue().location ) );
+ listener.recoverMessage(loadMessage(entry.getValue().location));
}
}
});
}
}
- public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+ public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
+ final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
- if( cursorPos == null ) {
+ if (cursorPos == null) {
cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
}
-
- Entry<Long, MessageKeys> entry=null;
+
+ Entry<Long, MessageKeys> entry = null;
int counter = 0;
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+ for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+ .hasNext();) {
entry = iterator.next();
- listener.recoverMessage( loadMessage(entry.getValue().location ) );
+ listener.recoverMessage(loadMessage(entry.getValue().location));
counter++;
- if( counter >= maxReturned ) {
+ if (counter >= maxReturned) {
break;
}
}
- if( entry!=null ) {
+ if (entry != null) {
sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
}
}
@@ -440,8 +675,8 @@ public class KahaDBStore extends Message
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
@@ -454,10 +689,10 @@ public class KahaDBStore extends Message
}
}
- String subscriptionKey(String clientId, String subscriptionName){
- return clientId+":"+subscriptionName;
+ String subscriptionKey(String clientId, String subscriptionName) {
+ return clientId + ":" + subscriptionName;
}
-
+
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return new KahaDBMessageStore(destination);
}
@@ -469,8 +704,9 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
@@ -478,24 +714,25 @@ public class KahaDBStore extends Message
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
public void deleteAllMessages() throws IOException {
- deleteAllMessages=true;
+ deleteAllMessages = true;
}
-
-
+
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+ for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
+ .hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey()));
@@ -503,7 +740,8 @@ public class KahaDBStore extends Message
}
}
- private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) throws IOException {
+ private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
+ throws IOException {
boolean isEmptyTopic = false;
ActiveMQDestination dest = convert(entry.getKey());
if (dest.isTopic()) {
@@ -521,13 +759,13 @@ public class KahaDBStore extends Message
throw new RuntimeException(e);
}
}
-
+
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
+
public long size() {
- if ( !started.get() ) {
+ if (!started.get()) {
return 0;
}
try {
@@ -546,15 +784,14 @@ public class KahaDBStore extends Message
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
-
+
public void checkpoint(boolean sync) throws IOException {
super.checkpointCleanup(false);
}
-
-
- ///////////////////////////////////////////////////////////////////
+
+ // /////////////////////////////////////////////////////////////////
// Internal helper methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
/**
* @param location
@@ -562,35 +799,35 @@ public class KahaDBStore extends Message
* @throws IOException
*/
Message loadMessage(Location location) throws IOException {
- KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
- Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
+ KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
+ Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
return msg;
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Internal conversion methods.
- ///////////////////////////////////////////////////////////////////
-
+ // /////////////////////////////////////////////////////////////////
+
KahaTransactionInfo createTransactionInfo(TransactionId txid) {
- if( txid ==null ) {
+ if (txid == null) {
return null;
}
KahaTransactionInfo rc = new KahaTransactionInfo();
-
+
// Link it up to the previous record that was part of the transaction.
ArrayList<Operation> tx = inflightTransactions.get(txid);
- if( tx!=null ) {
- rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
+ if (tx != null) {
+ rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
}
-
- if( txid.isLocalTransaction() ) {
- LocalTransactionId t = (LocalTransactionId)txid;
+
+ if (txid.isLocalTransaction()) {
+ LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
kahaTxId.setConnectionId(t.getConnectionId().getValue());
kahaTxId.setTransacitonId(t.getValue());
rc.setLocalTransacitonId(kahaTxId);
} else {
- XATransactionId t = (XATransactionId)txid;
+ XATransactionId t = (XATransactionId) txid;
KahaXATransactionId kahaTxId = new KahaXATransactionId();
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
@@ -599,18 +836,18 @@ public class KahaDBStore extends Message
}
return rc;
}
-
+
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
-
+
KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
- switch( dest.getDestinationType() ) {
+ switch (dest.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
rc.setType(DestinationType.QUEUE);
return rc;
@@ -630,13 +867,13 @@ public class KahaDBStore extends Message
ActiveMQDestination convert(String dest) {
int p = dest.indexOf(":");
- if( p<0 ) {
+ if (p < 0) {
throw new IllegalArgumentException("Not in the valid destination format");
}
int type = Integer.parseInt(dest.substring(0, p));
- String name = dest.substring(p+1);
-
- switch( KahaDestination.DestinationType.valueOf(type) ) {
+ String name = dest.substring(p + 1);
+
+ switch (KahaDestination.DestinationType.valueOf(type)) {
case QUEUE:
return new ActiveMQQueue(name);
case TOPIC:
@@ -645,9 +882,113 @@ public class KahaDBStore extends Message
return new ActiveMQTempQueue(name);
case TEMP_TOPIC:
return new ActiveMQTempTopic(name);
- default:
+ default:
throw new IllegalArgumentException("Not in the valid destination format");
}
}
-
+
+ class StoreQueueTask implements Runnable {
+ protected final Message message;
+ protected final ConnectionContext context;
+ protected final MessageStore store;
+ protected final InnerFutureTask future;
+ protected final AtomicBoolean done = new AtomicBoolean();
+
+ public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) {
+ this.store = store;
+ this.context = context;
+ this.message = message;
+ this.future = new InnerFutureTask(this);
+
+ }
+
+ public Future<Object> getFuture() {
+ return this.future;
+ }
+
+ public boolean cancel() {
+ if (this.done.compareAndSet(false, true)) {
+ this.future.cancel(false);
+ return true;
+ }
+ return false;
+ }
+
+ public void run() {
+ try {
+ if (this.done.compareAndSet(false, true)) {
+ this.store.addMessage(context, message);
+ removeQueueTask(this.message.getMessageId());
+ this.future.complete();
+ }
+ } catch (Exception e) {
+ this.future.setException(e);
+ }
+ }
+
+ protected Message getMessage() {
+ return this.message;
+ }
+
+ private class InnerFutureTask extends FutureTask<Object> {
+
+ public InnerFutureTask(Runnable runnable) {
+ super(runnable, null);
+
+ }
+
+ public void setException(final Exception e) {
+ super.setException(e);
+ }
+
+ public void complete() {
+ super.set(null);
+ }
+ }
+ }
+
+ class StoreTopicTask extends StoreQueueTask {
+ private final int subscriptionCount;
+ private final List<String> subscriptionKeys = new ArrayList<String>(1);
+ private final KahaDBTopicMessageStore topicStore;
+ public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
+ int subscriptionCount) {
+ super(store, context, message);
+ this.topicStore = store;
+ this.subscriptionCount = subscriptionCount;
+
+ }
+
+ /**
+ * add a key
+ *
+ * @param key
+ * @return true if all acknowledgements received
+ */
+ public boolean addSubscriptionKey(String key) {
+ synchronized (this.subscriptionKeys) {
+ this.subscriptionKeys.add(key);
+ }
+ return this.subscriptionKeys.size() >= this.subscriptionCount;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (this.done.compareAndSet(false, true)) {
+ this.topicStore.addMessage(context, message);
+ // apply any acks we have
+ synchronized (this.subscriptionKeys) {
+ for (String key : this.subscriptionKeys) {
+ this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
+ }
+ }
+ removeQueueTask(this.message.getMessageId());
+ this.future.complete();
+ }
+ } catch (Exception e) {
+ this.future.setException(e);
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=945102&r1=945101&r2=945102&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon May 17 11:53:28 2010
@@ -57,6 +57,8 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
@@ -78,7 +80,7 @@ import org.apache.kahadb.util.SequenceSe
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
-public class MessageDatabase implements BrokerServiceAware {
+public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
private BrokerService brokerService;
@@ -171,7 +173,7 @@ public class MessageDatabase implements
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
- private int indexCacheSize = 100;
+ private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
@@ -179,16 +181,14 @@ public class MessageDatabase implements
public MessageDatabase() {
}
- public void start() throws Exception {
- if (started.compareAndSet(false, true)) {
- load();
- }
+ @Override
+ public void doStart() throws Exception {
+ load();
}
- public void stop() throws Exception {
- if (started.compareAndSet(true, false)) {
- unload();
- }
+ @Override
+ public void doStop(ServiceStopper stopper) throws Exception {
+ unload();
}
private void loadPageFile() throws IOException {