You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/15 22:33:41 UTC
svn commit: r955039 [1/2] - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/ command/ store/ store/amq/ store/journal/ store/kahadaptor/
store/kahadb/ store/memory/ transaction/
Author: rajdavies
Date: Tue Jun 15 20:33:41 2010
New Revision: 955039
URL: http://svn.apache.org/viewvc?rev=955039&view=rev
Log:
Transactions dispatch and commit to the store asynchronously, though the commit only returns to the producer when they both complete for KahaDB
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun 15 20:33:41 2010
@@ -93,7 +93,8 @@ public class Queue extends BaseDestinati
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
- // Messages that are paged in but have not yet been targeted at a subscription
+ // Messages that are paged in but have not yet been targeted at a
+ // subscription
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
@@ -101,7 +102,8 @@ public class Queue extends BaseDestinati
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private ExecutorService executor;
- protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
+ protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
+ .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
@@ -112,7 +114,7 @@ public class Queue extends BaseDestinati
private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch;
private final AtomicLong pendingWakeups = new AtomicLong();
-
+
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
asyncWakeup();
@@ -123,46 +125,47 @@ public class Queue extends BaseDestinati
expireMessages();
}
};
-
- private final Object iteratingMutex = new Object() {};
+
+ private final Object iteratingMutex = new Object() {
+ };
private final Scheduler scheduler;
-
+
class TimeoutMessage implements Delayed {
Message message;
ConnectionContext context;
long trigger;
-
+
public TimeoutMessage(Message message, ConnectionContext context, long delay) {
this.message = message;
this.context = context;
this.trigger = System.currentTimeMillis() + delay;
}
-
+
public long getDelay(TimeUnit unit) {
long n = trigger - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}
public int compareTo(Delayed delayed) {
- long other = ((TimeoutMessage)delayed).trigger;
+ long other = ((TimeoutMessage) delayed).trigger;
int returnValue;
if (this.trigger < other) {
- returnValue = -1;
+ returnValue = -1;
} else if (this.trigger > other) {
- returnValue = 1;
+ returnValue = 1;
} else {
- returnValue = 0;
+ returnValue = 0;
}
return returnValue;
}
-
+
}
-
+
DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
-
+
class FlowControlTimeoutTask extends Thread {
-
+
@Override
public void run() {
TimeoutMessage timeout;
@@ -172,8 +175,14 @@ public class Queue extends BaseDestinati
if (timeout != null) {
synchronized (messagesWaitingForSpace) {
if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
- ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
- + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+ ExceptionResponse response = new ExceptionResponse(
+ new ResourceAllocationException(
+ "Usage Manager Memory Limit reached. Stopping producer ("
+ + timeout.message.getProducerId()
+ + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName()
+ + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info"));
response.setCorrelationId(timeout.message.getCommandId());
timeout.context.getConnection().dispatchAsync(response);
}
@@ -187,19 +196,19 @@ public class Queue extends BaseDestinati
}
}
};
-
+
private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
-
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
public int compare(Subscription s1, Subscription s2) {
- //We want the list sorted in descending order
+ // We want the list sorted in descending order
return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
}
};
- public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
+ public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
+ DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.taskFactory = taskFactory;
this.dispatchSelector = new QueueDispatchSelector(destination);
@@ -212,7 +221,8 @@ public class Queue extends BaseDestinati
}
}
- // make the queue easily visible in the debugger from its task runner threads
+ // make the queue easily visible in the debugger from its task runner
+ // threads
final class QueueThread extends Thread {
final Queue queue;
@@ -231,9 +241,12 @@ public class Queue extends BaseDestinati
this.messages = new StoreQueueCursor(broker, this);
}
}
- // If a VMPendingMessageCursor don't use the default Producer System Usage
- // since it turns into a shared blocking queue which can lead to a network deadlock.
- // If we are cursoring to disk..it's not and issue because it does not block due
+ // If a VMPendingMessageCursor don't use the default Producer System
+ // Usage
+ // since it turns into a shared blocking queue which can lead to a
+ // network deadlock.
+ // If we are cursoring to disk..it's not and issue because it does not
+ // block due
// to large disk sizes.
if (messages instanceof VMPendingMessageCursor) {
this.systemUsage = brokerService.getSystemUsage();
@@ -260,7 +273,8 @@ public class Queue extends BaseDestinati
if (message.isExpired()) {
if (broker.isExpired(message)) {
messageExpired(createConnectionContext(), createMessageReference(message));
- // drop message will decrement so counter balance here
+ // drop message will decrement so counter
+ // balance here
destinationStatistics.getMessages().increment();
}
return true;
@@ -300,8 +314,9 @@ public class Queue extends BaseDestinati
}
/*
- * Holder for subscription that needs attention on next iterate
- * browser needs access to existing messages in the queue that have already been dispatched
+ * Holder for subscription that needs attention on next iterate browser
+ * needs access to existing messages in the queue that have already been
+ * dispatched
*/
class BrowserDispatch {
QueueBrowserSubscription browser;
@@ -370,26 +385,30 @@ public class Queue extends BaseDestinati
browserDispatches.addLast(browserDispatch);
}
}
-
+
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
- // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+ // iteratingMutex -> dispatchLock. - see
+ // https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
-
- public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
+
+ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
+ throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
synchronized (dispatchMutex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
- + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+ LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+ + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ + getDestinationStatistics().getInflight().getCount());
}
synchronized (consumers) {
removeFromConsumerList(sub);
@@ -398,7 +417,9 @@ public class Queue extends BaseDestinati
if (exclusiveConsumer == sub) {
exclusiveConsumer = null;
for (Subscription s : consumers) {
- if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
+ if (s.getConsumerInfo().isExclusive()
+ && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
+ .getConsumerInfo().getPriority())) {
exclusiveConsumer = s;
}
@@ -410,13 +431,15 @@ public class Queue extends BaseDestinati
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
-
+
for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
qmr.unlock();
- // only increment redelivery if it was delivered or we have no delivery information
- if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
+ // only increment redelivery if it was delivered or we
+ // have no delivery information
+ if (lastDeiveredSequenceId == 0
+ || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
qmr.incrementRedeliveryCounter();
}
}
@@ -432,7 +455,8 @@ public class Queue extends BaseDestinati
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
- // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+ // iteratingMutex -> dispatchLock. - see
+ // https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}
@@ -443,9 +467,10 @@ public class Queue extends BaseDestinati
// destination.. it may have expired.
message.setRegionDestination(this);
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
- final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
+ final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
+ && !context.isInRecoveryMode();
if (message.isExpired()) {
- //message not stored - or added to stats yet - so chuck here
+ // message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
@@ -459,20 +484,28 @@ public class Queue extends BaseDestinati
if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("Usage Manager Memory Limit ("+ memoryUsage.getLimit() + ") reached on " + getActiveMQDestination().getQualifiedName()
- + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
- + " See http://activemq.apache.org/producer-flow-control.html for more info");
+ LOG
+ .info("Usage Manager Memory Limit ("
+ + memoryUsage.getLimit()
+ + ") reached on "
+ + getActiveMQDestination().getQualifiedName()
+ + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
- throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
- + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+ + message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
- // We can avoid blocking due to low usage if the producer is sending
+ // We can avoid blocking due to low usage if the producer is
+ // sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
- // copy the exchange state since the context will be modified while we are waiting
+ // copy the exchange state since the context will be
+ // modified while we are waiting
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
@@ -491,7 +524,8 @@ public class Queue extends BaseDestinati
}
if (sendProducerAck) {
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
+ .getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
@@ -510,9 +544,10 @@ public class Queue extends BaseDestinati
}
}
});
-
+
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
- flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+ flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
+ .getSendFailIfNoSpaceAfterTimeout()));
}
registerCallbackForNotFullNotification();
@@ -523,8 +558,10 @@ public class Queue extends BaseDestinati
} else {
if (memoryUsage.isFull()) {
- waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
- + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+ waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
+ + message.getProducerId() + ") stopped to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
// The usage manager could have delayed us by the time
@@ -555,14 +592,18 @@ public class Queue extends BaseDestinati
}
}
- void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
+ Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
- String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ + message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
@@ -572,11 +613,7 @@ public class Queue extends BaseDestinati
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
- if (context.isInTransaction()) {
- store.addMessage(context, message);
- }else {
- result = store.asyncAddQueueMessage(context, message);
- }
+ result = store.asyncAddQueueMessage(context, message);
}
}
if (context.isInTransaction()) {
@@ -613,10 +650,10 @@ public class Queue extends BaseDestinati
}
if (result != null && !result.isCancelled()) {
try {
- result.get();
- }catch(CancellationException e) {
- //ignore - the task has been cancelled if the message
- // has already been deleted
+ result.get();
+ } catch (CancellationException e) {
+ // ignore - the task has been cancelled if the message
+ // has already been deleted
}
}
}
@@ -652,10 +689,12 @@ public class Queue extends BaseDestinati
public void gc() {
}
- public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
+ public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
+ throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
- // the original ack may be a ranged ack, but we are trying to delete a specific
+ // the original ack may be a ranged ack, but we are trying to delete
+ // a specific
// message store here so we need to convert to a non ranged ack.
if (ack.getMessageCount() > 0) {
// Dup the ack
@@ -692,7 +731,8 @@ public class Queue extends BaseDestinati
synchronized (messages) {
size = messages.size();
}
- return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+ return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+ + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+ messageGroupOwners;
}
@@ -705,15 +745,15 @@ public class Queue extends BaseDestinati
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
-
+
flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
-
+
// Start flow control timeout task
// Prevent trying to start it multiple times
if (!flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.start();
}
-
+
doPageIn(false);
}
@@ -726,7 +766,7 @@ public class Queue extends BaseDestinati
}
scheduler.cancel(expireMessagesTask);
-
+
if (flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.interrupt();
}
@@ -897,7 +937,8 @@ public class Queue extends BaseDestinati
}
}
- private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+ private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize,
+ List<MessageReference> toExpire) throws Exception {
for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
QueueMessageReference ref = i.next();
if (ref.isExpired()) {
@@ -962,10 +1003,13 @@ public class Queue extends BaseDestinati
} catch (IOException e) {
}
}
- // don't spin/hang if stats are out and there is nothing left in the store
+ // don't spin/hang if stats are out and there is nothing left in the
+ // store
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
if (this.destinationStatistics.getMessages().getCount() > 0) {
- LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " + this.destinationStatistics.getMessages().getCount());
+ LOG.warn(getActiveMQDestination().getQualifiedName()
+ + " after purge complete, message count stats report: "
+ + this.destinationStatistics.getMessages().getCount());
}
gc();
this.destinationStatistics.getMessages().setCount(0);
@@ -1032,7 +1076,8 @@ public class Queue extends BaseDestinati
/**
* Copies the message matching the given messageId
*/
- public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
+ public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
+ throws Exception {
return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
}
@@ -1041,7 +1086,8 @@ public class Queue extends BaseDestinati
*
* @return the number of messages copied
*/
- public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
+ public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
+ throws Exception {
return copyMatchingMessagesTo(context, selector, dest, -1);
}
@@ -1051,7 +1097,8 @@ public class Queue extends BaseDestinati
*
* @return the number of messages copied
*/
- public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
+ int maximumMessages) throws Exception {
return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
}
@@ -1061,7 +1108,8 @@ public class Queue extends BaseDestinati
*
* @return the number of messages copied
*/
- public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
+ int maximumMessages) throws Exception {
int movedCounter = 0;
int count = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
@@ -1098,9 +1146,12 @@ public class Queue extends BaseDestinati
/**
* Move a message
*
- * @param context connection context
- * @param m message
- * @param dest ActiveMQDestination
+ * @param context
+ * connection context
+ * @param m
+ * message
+ * @param dest
+ * ActiveMQDestination
* @throws Exception
*/
public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
@@ -1116,7 +1167,8 @@ public class Queue extends BaseDestinati
/**
* Moves the message matching the given messageId
*/
- public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
+ public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
+ throws Exception {
return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
}
@@ -1125,7 +1177,8 @@ public class Queue extends BaseDestinati
*
* @return the number of messages removed
*/
- public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
+ public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
+ throws Exception {
return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
}
@@ -1133,7 +1186,8 @@ public class Queue extends BaseDestinati
* Moves the messages matching the given selector up to the maximum number
* of matched messages
*/
- public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
+ int maximumMessages) throws Exception {
return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
}
@@ -1141,7 +1195,8 @@ public class Queue extends BaseDestinati
* Moves the messages matching the given filter up to the maximum number of
* matched messages
*/
- public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
+ ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
@@ -1180,7 +1235,7 @@ public class Queue extends BaseDestinati
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
- boolean pageInMoreMessages = false;
+ boolean pageInMoreMessages = false;
synchronized (iteratingMutex) {
// do early to allow dispatch of these waiting messages
@@ -1202,7 +1257,8 @@ public class Queue extends BaseDestinati
firstConsumer = false;
try {
if (consumersBeforeDispatchStarts > 0) {
- int timeout = 1000; // wait one second by default if consumer count isn't reached
+ int timeout = 1000; // wait one second by default if
+ // consumer count isn't reached
if (timeBeforeDispatchStarts > 0) {
timeout = timeBeforeDispatchStarts;
}
@@ -1212,7 +1268,8 @@ public class Queue extends BaseDestinati
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
+ LOG.debug(timeout + " ms elapsed and " + consumers.size()
+ + " consumers subscribed. Starting dispatch.");
}
}
}
@@ -1226,21 +1283,24 @@ public class Queue extends BaseDestinati
LOG.error(e);
}
}
-
+
BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
synchronized (messages) {
pageInMoreMessages |= !messages.isEmpty();
}
- // Kinda ugly.. but I think dispatchLock is the only mutex protecting the
- // pagedInPendingDispatch variable.
+ // Kinda ugly.. but I think dispatchLock is the only mutex
+ // protecting the
+ // pagedInPendingDispatch variable.
synchronized (dispatchMutex) {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
}
- // Perhaps we should page always into the pagedInPendingDispatch list if
- // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+ // Perhaps we should page always into the pagedInPendingDispatch
+ // list if
+ // !messages.isEmpty(), and then if
+ // !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
try {
@@ -1250,7 +1310,7 @@ public class Queue extends BaseDestinati
LOG.error("Failed to page in more queue messages ", e);
}
}
-
+
if (pendingBrowserDispatch != null) {
ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
synchronized (pagedInMessages) {
@@ -1264,7 +1324,7 @@ public class Queue extends BaseDestinati
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
-
+
QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
for (QueueMessageReference node : alreadyDispatchedMessages) {
if (!node.isAcked()) {
@@ -1278,10 +1338,10 @@ public class Queue extends BaseDestinati
} catch (Exception e) {
LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
}
-
+
} while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
}
-
+
if (pendingWakeups.get() > 0) {
pendingWakeups.decrementAndGet();
}
@@ -1336,7 +1396,8 @@ public class Queue extends BaseDestinati
removeMessage(c, subs, r, ack);
}
- protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
+ protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
+ MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
@@ -1408,7 +1469,8 @@ public class Queue extends BaseDestinati
final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
if (systemUsage.getTempUsage().isFull()) {
- final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId()
+ + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException(logMessage);
@@ -1460,12 +1522,14 @@ public class Queue extends BaseDestinati
synchronized (dispatchMutex) {
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
}
if (isLazyDispatch() && !force) {
- // Only page in the minimum number of messages which can be dispatched immediately.
+ // Only page in the minimum number of messages which can be
+ // dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
@@ -1478,6 +1542,7 @@ public class Queue extends BaseDestinati
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
messages.remove();
+
QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) {
if (broker.isExpired(ref)) {
@@ -1494,7 +1559,8 @@ public class Queue extends BaseDestinati
messages.release();
}
}
- // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
+ // Only add new messages, not already pagedIn to avoid multiple
+ // dispatch attempts
synchronized (pagedInMessages) {
resultList = new ArrayList<QueueMessageReference>(result.size());
for (QueueMessageReference ref : result) {
@@ -1520,7 +1586,8 @@ public class Queue extends BaseDestinati
synchronized (pagedInPendingDispatch) {
if (!redeliveredWaitingDispatch.isEmpty()) {
- // Try first to dispatch redelivered messages to keep an proper order
+ // Try first to dispatch redelivered messages to keep an
+ // proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (!pagedInPendingDispatch.isEmpty()) {
@@ -1528,7 +1595,8 @@ public class Queue extends BaseDestinati
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
- // and now see if we can dispatch the new stuff.. and append to the pending
+ // and now see if we can dispatch the new stuff.. and append to
+ // the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
@@ -1581,29 +1649,34 @@ public class Queue extends BaseDestinati
if (!s.isFull()) {
// Dispatch it.
s.add(node);
- target = s;
+ target = s;
break;
} else {
- // no further dispatch of list to a full consumer to avoid out of order message receipt
+ // no further dispatch of list to a full consumer to
+ // avoid out of order message receipt
fullConsumers.add(s);
}
}
interestCount++;
} else {
// makes sure it gets dispatched again
- if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+ if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
+ && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
interestCount++;
}
}
}
if ((target == null && interestCount > 0) || consumers.size() == 0) {
- // This means all subs were full or that there are no consumers...
+ // This means all subs were full or that there are no
+ // consumers...
rc.add((QueueMessageReference) node);
}
- // If it got dispatched, rotate the consumer list to get round robin distribution.
- if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
+ // If it got dispatched, rotate the consumer list to get round robin
+ // distribution.
+ if (target != null && !strictOrderDispatch && consumers.size() > 1
+ && !dispatchSelector.isExclusiveConsumer(target)) {
synchronized (this.consumers) {
if (removeFromConsumerList(target)) {
addToConsumerList(target);
@@ -1654,7 +1727,6 @@ public class Queue extends BaseDestinati
* dispatch process is non deterministic between master and slave. On a
* notification, the actual dispatch to the subscription (as chosen by the
* master) is completed. (non-Javadoc)
- *
* @see
* org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
* (org.apache.activemq.command.MessageDispatchNotification)
@@ -1670,7 +1742,8 @@ public class Queue extends BaseDestinati
}
}
- private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
+ private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
+ throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
@@ -1719,8 +1792,11 @@ public class Queue extends BaseDestinati
}
if (message == null) {
- throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
- + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
+ throw new JMSException("Slave broker out of sync with master - Message: "
+ + messageDispatchNotification.getMessageId() + " on "
+ + messageDispatchNotification.getDestination() + " does not exist among pending("
+ + pagedInPendingDispatch.size() + ") for subscription: "
+ + messageDispatchNotification.getConsumerId());
}
return message;
}
@@ -1732,7 +1808,8 @@ public class Queue extends BaseDestinati
* @return sub or null if the subscription has been removed before dispatch
* @throws JMSException
*/
- private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException {
+ private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
+ throws JMSException {
Subscription sub = null;
synchronized (consumers) {
for (Subscription s : consumers) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jun 15 20:33:41 2010
@@ -163,10 +163,10 @@ public class Topic extends BaseDestinati
DurableTopicSubscription removed = durableSubcribers.remove(key);
if (removed != null) {
destinationStatistics.getConsumers().decrement();
- }
- // deactivate and remove
- removed.deactivate(false);
- consumers.remove(removed);
+ // deactivate and remove
+ removed.deactivate(false);
+ consumers.remove(removed);
+ }
}
}
@@ -418,12 +418,8 @@ public class Topic extends BaseDestinati
}
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
- }
- if (context.isInTransaction()) {
- topicStore.addMessage(context, message);
- }else {
- result = topicStore.asyncAddTopicMessage(context, message);
}
+ topicStore.asyncAddTopicMessage(context, message);
}
message.incrementReferenceCount();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Tue Jun 15 20:33:41 2010
@@ -602,6 +602,8 @@ public abstract class Message extends Ba
if (rc == 1 && getMemoryUsage() != null) {
getMemoryUsage().increaseUsage(size);
+ //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
+
}
//System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
@@ -618,7 +620,10 @@ public abstract class Message extends Ba
if (rc == 0 && getMemoryUsage() != null) {
getMemoryUsage().decreaseUsage(size);
+ //Thread.dumpStack();
+ //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
}
+
//System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java Tue Jun 15 20:33:41 2010
@@ -17,8 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-import java.util.concurrent.FutureTask;
-
import org.apache.activemq.Service;
import org.apache.activemq.command.TransactionId;
@@ -32,10 +30,9 @@ public interface TransactionStore extend
void prepare(TransactionId txid) throws IOException;
- void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
+ void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException;
void rollback(TransactionId txid) throws IOException;
void recover(TransactionRecoveryListener listener) throws IOException;
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
@@ -99,7 +97,10 @@ public class AMQTransactionStore impleme
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+ if (preCommit != null) {
+ preCommit.run();
+ }
AMQTx tx;
if (wasPrepared) {
synchronized (preparedTransactions) {
@@ -111,7 +112,9 @@ public class AMQTransactionStore impleme
}
}
if (tx == null) {
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
return;
}
if (txid.isXATransaction()) {
@@ -119,7 +122,9 @@ public class AMQTransactionStore impleme
} else {
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
}
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -22,9 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
import javax.transaction.xa.XAException;
-
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTransaction;
@@ -40,8 +38,8 @@ import org.apache.activemq.store.Transac
public class JournalTransactionStore implements TransactionStore {
private final JournalPersistenceAdapter peristenceAdapter;
- private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
- private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
+ private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
+ private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
private boolean doingRecover;
public static class TxOperation {
@@ -70,7 +68,7 @@ public class JournalTransactionStore imp
public static class Tx {
private final RecordLocation location;
- private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
+ private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
public Tx(RecordLocation location) {
this.location = location;
@@ -176,8 +174,11 @@ public class JournalTransactionStore imp
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
Tx tx;
+ if (preCommit != null) {
+ preCommit.run();
+ }
if (wasPrepared) {
synchronized (preparedTransactions) {
tx = preparedTransactions.remove(txid);
@@ -188,7 +189,9 @@ public class JournalTransactionStore imp
}
}
if (tx == null) {
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
return;
}
if (txid.isXATransaction()) {
@@ -198,7 +201,9 @@ public class JournalTransactionStore imp
peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
wasPrepared), true);
}
- done.run();
+ if (postCommit != null) {
+ postCommit.run();
+ }
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-
import javax.transaction.xa.XAException;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
@@ -38,7 +36,6 @@ import org.apache.activemq.store.ProxyTo
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,9 +48,9 @@ import org.apache.commons.logging.LogFac
public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
- private Map transactions = new ConcurrentHashMap();
- private Map prepared;
- private KahaPersistenceAdapter adaptor;
+ private final Map transactions = new ConcurrentHashMap();
+ private final Map prepared;
+ private final KahaPersistenceAdapter adaptor;
private BrokerService brokerService;
@@ -64,10 +61,12 @@ public class KahaTransactionStore implem
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
+ @Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
+ @Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
@@ -76,10 +75,12 @@ public class KahaTransactionStore implem
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
+ @Override
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
+ @Override
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
@@ -101,13 +102,18 @@ public class KahaTransactionStore implem
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
- public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+ public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
+ if(before != null) {
+ before.run();
+ }
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.commit(this);
removeTx(txid);
}
- done.run();
+ if (after != null) {
+ after.run();
+ }
}
/**