You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/30 19:11:17 UTC
svn commit: r789831 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-kaha/src/main/proto/ activemq-openwire/src/main/jav...
Author: cmacnaug
Date: Tue Jun 30 17:11:16 2009
New Revision: 789831
URL: http://svn.apache.org/viewvc?rev=789831&view=rev
Log:
Refactoring in support of Transactions.
-Changed Store interface delete to remove by queue key as opposed to message key.
-Updates to Transaction/TransactionMgr
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Tue Jun 30 17:11:16 2009
@@ -560,8 +560,8 @@
* The queue.
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext deleteQueueElement(long storeTracking, QueueDescriptor queue) {
- return add(new DeleteOperation(storeTracking, queue), null, false);
+ public OperationContext deleteQueueElement(SaveableQueueElement<?> queueElement) {
+ return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false);
}
/**
@@ -854,11 +854,11 @@
}
private class DeleteOperation extends OperationBase {
- private final long storeTracking;
+ private final long queueKey;
private QueueDescriptor queue;
- public DeleteOperation(long tracking, QueueDescriptor queue) {
- this.storeTracking = tracking;
+ public DeleteOperation(long queueKey, QueueDescriptor queue) {
+ this.queueKey = queueKey;
this.queue = queue;
}
@@ -870,7 +870,7 @@
@Override
protected void doExcecute(Session session) {
try {
- session.queueRemoveMessage(queue, storeTracking);
+ session.queueRemoveMessage(queue, queueKey);
} catch (KeyNotFoundException e) {
// TODO Probably doesn't always mean an error, it is possible
// that
@@ -886,7 +886,7 @@
}
public String toString() {
- return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + storeTracking + " " + super.toString();
+ return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString();
}
}
@@ -1223,17 +1223,17 @@
* @param delayable
*/
public <T> OperationContext saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
- return add(new AddElementOpOperation<T>(sqe, delayable, marshaller), source, !delayable);
+ return add(new AddElementOperation<T>(sqe, delayable, marshaller), source, !delayable);
}
- private class AddElementOpOperation<T> extends OperationBase {
+ private class AddElementOperation<T> extends OperationBase {
private final SaveableQueueElement<T> op;
private MessageRecord record;
private boolean delayable;
private final MessageRecordMarshaller<T> marshaller;
- public AddElementOpOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+ public AddElementOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
this.op = op;
this.delayable = delayable;
if (!delayable) {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -73,7 +73,7 @@
return fromStore;
}
- public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+ public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
synchronized (this) {
// Can flush of this message to the store be delayed?
if (enableFlushDelay && !delayable) {
@@ -83,13 +83,13 @@
// list of queues for which to save the message when dispatch is
// finished:
if (dispatching) {
- addPersistentTarget(elem);
+ addPersistentTarget(sqe);
return;
}
// Otherwise, if it is still in the saver queue, we can add this
// queue to the queue list:
else if (pendingSave != null) {
- addPersistentTarget(elem);
+ addPersistentTarget(sqe);
if (!delayable) {
pendingSave.requestFlush();
}
@@ -97,10 +97,10 @@
}
}
- store.saveMessage(elem, controller, delayable);
+ store.saveMessage(sqe, controller, delayable);
}
- public final void acknowledge(QueueDescriptor queue) {
+ public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
boolean firePersistListener = false;
boolean deleted = false;
synchronized (this) {
@@ -110,7 +110,7 @@
deleted = true;
- removePersistentTarget(queue);
+ removePersistentTarget(sqe.getQueueDescriptor());
// We get a save context when we place the message in the
// database queue. If it has been added to the queue,
// and we've removed the last queue, see if we can cancel
@@ -127,7 +127,7 @@
}
if (!deleted) {
- store.deleteQueueElement(getStoreTracking(), queue);
+ store.deleteQueueElement(sqe);
}
if (firePersistListener) {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Tue Jun 30 17:11:16 2009
@@ -106,7 +106,7 @@
public Integer map(MessageDelivery element) {
return element.getFlowLimiterSize();
}
- };
+ };
public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
public Integer map(MessageDelivery element) {
@@ -127,7 +127,7 @@
return (int) (element.getProducerId().hashCode() % 10);
}
};
-
+
public static final short SUBPARTITION_TYPE = 0;
public static final short SHARED_QUEUE_TYPE = 1;
public static final short DURABLE_QUEUE_TYPE = 2;
@@ -251,7 +251,7 @@
}
}
-
+
private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
IQueue<Long, MessageDelivery> queue;
@@ -452,8 +452,16 @@
return ret;
}
- public final void deleteQueueElement(QueueDescriptor descriptor, MessageDelivery elem) {
- elem.acknowledge(descriptor);
+ public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
+ MessageDelivery md = sqe.getElement();
+ //If the message delivery isn't null, funnel through it
+ //since the message may not yet be in the store:
+ if (md != null) {
+ md.acknowledge(sqe);
+ } else {
+ database.deleteQueueElement(sqe);
+ }
+
}
public final boolean isFromStore(MessageDelivery elem) {
@@ -464,8 +472,7 @@
elem.getElement().persist(elem, controller, delayable);
}
- public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
- RestoreListener<MessageDelivery> listener) {
+ public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Tue Jun 30 17:11:16 2009
@@ -43,7 +43,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
*/
@Override
- public void commit(boolean onePhase) throws XAException, IOException {
+ public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
}
@@ -54,7 +54,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#prepare()
*/
@Override
- public int prepare() throws XAException, IOException {
+ public int prepare(TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
return 0;
}
@@ -65,7 +65,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#rollback()
*/
@Override
- public void rollback() throws XAException, IOException {
+ public void rollback(TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -22,7 +22,6 @@
import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
public interface MessageDelivery {
@@ -76,9 +75,11 @@
/**
* @return if the message is part of a transaction this returns the
- * transaction id.
+ * transaction id returned by {@link Transaction#getTid()} otherwise
+ * a value of -1 indicates that this delivery is not part of a
+ * transaction
*/
- public Buffer getTransactionId();
+ public long getTransactionId();
/**
* Called by a queue to request that the element be persisted. The save is
@@ -92,7 +93,7 @@
* {@link SaveableQueueElement#requestSaveNotify()} method before attempting
* to acces the store directly.
*
- * @param elem
+ * @param sqe
* The element to save
* @param controller
* A flow controller to use in the event that there isn't room in
@@ -100,16 +101,16 @@
* @param delayable
* Whether or not the save operation can be delayed.
*/
- public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable);
+ public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
/**
* Acknowledges the message for a particular queue. This will cause it to be
* deleted from the message store.
*
- * @param queue
- * The queue for which to acknowledge the message.
+ * @param sqe
+ * The queue element to delete
*/
- public void acknowledge(QueueDescriptor queue);
+ public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
/**
* Gets the tracking number used to identify this message in the message
@@ -118,9 +119,10 @@
* @return The store tracking or -1 if not set.
*/
public long getStoreTracking();
-
+
/**
* Used to apply selectors against the message.
+ *
* @return
*/
public MessageEvaluationContext createMessageEvaluationContext();
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Tue Jun 30 17:11:16 2009
@@ -22,7 +22,6 @@
import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
/**
* @author cmacnaug
@@ -32,14 +31,9 @@
private final MessageDelivery delegate;
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
- public void acknowledge(QueueDescriptor queue) {
- delegate.acknowledge(queue);
+
+ public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+ delegate.acknowledge(sqe);
}
/**
@@ -128,7 +122,7 @@
* @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
* org.apache.activemq.flow.ISourceController, boolean)
*/
- public Buffer getTransactionId() {
+ public long getTransactionId() {
return delegate.getTransactionId();
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Tue Jun 30 17:11:16 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo.broker;
import java.io.IOException;
+import java.util.HashSet;
import javax.transaction.xa.XAException;
@@ -32,8 +33,6 @@
/**
* Keeps track of all the actions the need to be done when a transaction does a
* commit or rollback.
- *
- * @version $Revision: 1.5 $
*/
public abstract class Transaction {
@@ -49,6 +48,7 @@
private final TransactionManager manager;
private final long tid;
private final IQueue<Long, TxOp> opQueue;
+ private HashSet<TransactionListener> listeners;
Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
this.manager = manager;
@@ -134,22 +134,26 @@
protected void fireAfterCommit() throws Exception {
- //TODO
+ synchronized (this) {
+
+ }
}
public void fireAfterRollback() throws Exception {
- //TODO
+ synchronized (this) {
+
+ }
}
public String toString() {
return super.toString() + "[queue=" + opQueue + "]";
}
- public abstract void commit(boolean onePhase) throws XAException, IOException;
+ public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
- public abstract void rollback() throws XAException, IOException;
+ public abstract void rollback(TransactionListener listener) throws XAException, IOException;
- public abstract int prepare() throws XAException, IOException;
+ public abstract int prepare(TransactionListener listener) throws XAException, IOException;
public boolean isPrepared() {
return getState() == PREPARED_STATE;
@@ -159,6 +163,20 @@
return opQueue.getEnqueuedCount();
}
+ public static abstract class TransactionListener {
+ public void onRollback(Transaction t) {
+
+ }
+
+ public void onCommit(Transaction t) {
+
+ }
+
+ public void onPrepared(Transaction t) {
+
+ }
+ }
+
interface TxOp {
public static final short TYPE_MESSAGE = 0;
public static final short TYPE_ACK = 1;
@@ -266,9 +284,9 @@
private boolean fromStore;
private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
- TxAck(IQueue<Long, ?> queue, long storeTracking, Transaction tx) {
+ TxAck(IQueue<Long, ?> queue, long removalKey, Transaction tx) {
this.queue = queue;
- this.storeTracking = storeTracking;
+ this.queueSequence = removalKey;
this.tx = tx;
}
@@ -290,7 +308,7 @@
* @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
*/
public final void onCommit() {
- //TODO
+ queue.remove(queueSequence);
}
@@ -347,12 +365,12 @@
AsciiBuffer queueName = queue.getDescriptor().getQueueName();
DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
try {
- baos.writeShort(queueName.length);
- baos.write(queueName.data, queueName.offset, queueName.length);
- baos.writeLong(queueSequence);
- } catch (IOException shouldNotHappen) {
- throw new RuntimeException(shouldNotHappen);
- }
+ baos.writeShort(queueName.length);
+ baos.write(queueName.data, queueName.offset, queueName.length);
+ baos.writeLong(queueSequence);
+ } catch (IOException shouldNotHappen) {
+ throw new RuntimeException(shouldNotHappen);
+ }
return baos.toBuffer();
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Tue Jun 30 17:11:16 2009
@@ -58,9 +58,10 @@
private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+ private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
private final VirtualHost host;
- private final BrokerDatabase database;
+ private BrokerDatabase database;
private final AtomicLong tidGen = new AtomicLong(0);
private final TransactionStore txStore;
@@ -155,7 +156,7 @@
* @param xid
* @return
*/
- public final Transaction createTransaction(Buffer xid) {
+ public synchronized final Transaction createTransaction(Buffer xid) {
Transaction ret;
long tid = tidGen.incrementAndGet();
@@ -164,7 +165,9 @@
if (xid == null) {
ret = new LocalTransaction(this, tid, opQueue);
} else {
- ret = new XATransaction(this, tid, xid, opQueue);
+ XATransaction xat = new XATransaction(this, tid, xid, opQueue);
+ ret = xat;
+ xaTransactions.put(xid, xat);
}
transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
@@ -174,10 +177,18 @@
}
/**
+ * @param buffer
+ * @return
+ */
+ public synchronized Transaction getXATransaction(Buffer buffer) {
+ return xaTransactions.get(buffer);
+ }
+
+ /**
*
* @throws Exception
*/
- public void loadTransactions() throws Exception {
+ public synchronized void loadTransactions() throws Exception {
tidGen.set(database.allocateStoreTracking());
@@ -196,26 +207,28 @@
IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
Transaction tx = loadTransaction(b, queue);
-
+
//TODO if we recover a tx that isn't committed then, we should discard it.
if (tx.getState() < Transaction.FINISHED_STATE) {
LOG.warn("Recovered unfinished transaction: " + tx);
}
transactions.put(tx.getTid(), tx);
+ if (tx instanceof XATransaction) {
+ XATransaction xat = XATransaction.class.cast(tx);
+ xaTransactions.put(xat.getXid(), xat);
+ }
LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
}
- if (txns.isEmpty()) {
+ if (!txns.isEmpty()) {
//TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
//deleted:
LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
-
}
}
- private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException
- {
+ private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
//TODO move the serialization into the transaction itself:
DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
byte type = bais.readByte();
@@ -228,9 +241,9 @@
tx = new LocalTransaction(this, tid, queue);
break;
case Transaction.TYPE_XA:
- int length = bais.readByte() & 0xFF;
- Buffer xid = new Buffer(new byte[length]);
- bais.readFully(xid.data);
+ int length = bais.readByte() & 0xFF;
+ Buffer xid = new Buffer(new byte[length]);
+ bais.readFully(xid.data);
tx = new XATransaction(this, tid, xid, queue);
break;
default:
@@ -241,22 +254,21 @@
return tx;
}
-
+
public OperationContext persistTransaction(Transaction tx) throws IOException {
-
+
//TODO move the serialization into the transaction itself:
DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
baos.writeByte(tx.getType());
baos.writeByte(tx.getState());
baos.writeLong(tx.getTid());
- if(tx.getType() == Transaction.TYPE_XA)
- {
- Buffer xid = ((XATransaction)tx).getXid();
- // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
- baos.writeByte( xid.length & 0xFF );
- baos.write(xid.data, xid.offset, xid.length);
+ if (tx.getType() == Transaction.TYPE_XA) {
+ Buffer xid = ((XATransaction) tx).getXid();
+ // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+ baos.writeByte(xid.length & 0xFF);
+ baos.write(xid.data, xid.offset, xid.length);
}
-
+
return database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
}
@@ -320,7 +332,6 @@
*/
public void addQueue(QueueDescriptor queue) {
database.addQueue(queue);
-
}
/*
@@ -341,9 +352,8 @@
* org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
* .activemq.queue.QueueDescriptor, java.lang.Object)
*/
- public void deleteQueueElement(QueueDescriptor queue, TxOp element) {
- database.deleteQueueElement(element.getStoreTracking(), queue);
-
+ public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
+ database.deleteQueueElement(sqe);
}
/*
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Tue Jun 30 17:11:16 2009
@@ -40,24 +40,25 @@
final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
final private HashMap<String, DurableSubscription> durableSubs = new HashMap<String, DurableSubscription>();
final private Router router = new Router();
-
+
private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
private Broker broker;
private boolean started;
private BrokerDatabase database;
+ private TransactionManager txnManager;
public VirtualHost() {
this.router.setVirtualHost(this);
}
public VirtualHost(String name) {
- this();
- addHostName(new AsciiBuffer(name));
- }
+ this();
+ addHostName(new AsciiBuffer(name));
+ }
- public AsciiBuffer getHostName() {
+ public AsciiBuffer getHostName() {
if (hostNames.size() > 0) {
- return hostNames.get(0);
+ return hostNames.get(0);
}
return null;
}
@@ -65,12 +66,15 @@
public List<AsciiBuffer> getHostNames() {
return hostNames;
}
+
public void setHostNames(List<AsciiBuffer> hostNames) {
this.hostNames = new ArrayList<AsciiBuffer>(hostNames);
}
+
public void addHostName(AsciiBuffer hostName) {
this.hostNames.add(hostName);
}
+
public void removeHostName(AsciiBuffer hostName) {
this.hostNames.remove(hostName);
}
@@ -78,13 +82,15 @@
public Router getRouter() {
return router;
}
-
+
public BrokerDatabase getDatabase() {
return database;
}
+
public void setDatabase(BrokerDatabase database) {
this.database = database;
}
+
public void setStore(Store store) {
database = new BrokerDatabase(store);
}
@@ -95,18 +101,21 @@
return;
}
- if ( database == null ) {
- Store store = createDefaultStore();
- database = new BrokerDatabase(store);
- }
-
- database.setDispatcher(broker.getDispatcher());
- database.start();
+ if (database == null) {
+ Store store = createDefaultStore();
+ database = new BrokerDatabase(store);
+ }
+
+ database.setDispatcher(broker.getDispatcher());
+ database.start();
router.setDatabase(database);
+
+ //Recover queues:
queueStore.setDatabase(database);
queueStore.setDispatcher(broker.getDispatcher());
queueStore.loadQueues();
+
// Create Queue instances
for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
Queue queue = new Queue(iQueue);
@@ -119,19 +128,24 @@
for (Queue queue : queues.values()) {
queue.start();
}
+
+ //Recover transactions:
+ txnManager = new TransactionManager(this);
+ txnManager.loadTransactions();
+
started = true;
}
- public Store createDefaultStore() throws Exception {
- Store store = StoreFactory.createStore("kaha-db");
- if( store.getStoreDirectory() == null ) {
- File baseDir = broker.getDataDirectory();
- String hostName = getHostName().toString();
- String subDir = IOHelper.toFileSystemDirectorySafeName(hostName);
- store.setStoreDirectory( new File(baseDir, subDir ) );
- }
- return store;
- }
+ public Store createDefaultStore() throws Exception {
+ Store store = StoreFactory.createStore("kaha-db");
+ if (store.getStoreDirectory() == null) {
+ File baseDir = broker.getDataDirectory();
+ String hostName = getHostName().toString();
+ String subDir = IOHelper.toFileSystemDirectorySafeName(hostName);
+ store.setStoreDirectory(new File(baseDir, subDir));
+ }
+ return store;
+ }
public synchronized void stop() throws Exception {
if (!started) {
@@ -144,7 +158,7 @@
for (IQueue<Long, MessageDelivery> queue : queueStore.getDurableQueues()) {
queue.shutdown(true);
}
-
+
database.stop();
started = false;
}
@@ -164,7 +178,7 @@
Domain domain = router.getDomain(dest.getDomain());
domain.bind(dest.getName(), queue);
queues.put(dest.getName(), queue);
-
+
for (QueueLifecyleListener l : queueLifecyleListeners) {
l.onCreate(queue);
}
@@ -173,6 +187,10 @@
return queue;
}
+ public TransactionManager getTransactionManager() {
+ return txnManager;
+ }
+
public BrokerQueueStore getQueueStore() {
return queueStore;
}
@@ -182,20 +200,20 @@
}
public BrokerSubscription createSubscription(ConsumerContext consumer, Destination destination) throws Exception {
-
+
// First handle composite destinations..
Collection<Destination> destinations = destination.getDestinations();
- if(destinations != null) {
+ if (destinations != null) {
ArrayList<BrokerSubscription> subs = new ArrayList<BrokerSubscription>(destinations.size());
for (Destination childDest : destinations) {
subs.add(createSubscription(consumer, childDest));
}
return new CompositeSubscription(destination, subs);
}
-
+
// If it's a Topic...
- if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
-
+ if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN)) {
+
// It might be a durable subscription on the topic
if (consumer.isDurable()) {
DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
@@ -211,51 +229,53 @@
// return a standard subscription
return new TopicSubscription(this, destination, consumer.getSelectorExpression());
}
-
+
// It looks like a wild card subscription on a queue..
- if( PathFilter.containsWildCards(destination.getName()) ){
+ if (PathFilter.containsWildCards(destination.getName())) {
return new WildcardQueueSubscription(this, destination, consumer);
}
// It has to be a Queue subscription then..
Queue queue = queues.get(destination.getName());
- if( queue == null ) {
- if( consumer.autoCreateDestination() ) {
+ if (queue == null) {
+ if (consumer.autoCreateDestination()) {
queue = createQueue(destination);
} else {
- throw new IllegalStateException("The queue does not exist: "+destination.getName());
+ throw new IllegalStateException("The queue does not exist: " + destination.getName());
}
}
return new Queue.QueueSubscription(queue);
}
- public Broker getBroker() {
- return broker;
- }
-
- public void setBroker(Broker broker) {
- this.broker = broker;
- }
-
- interface QueueLifecyleListener {
-
- /**
- * A destination has bean created
- * @param destination
- */
+ public Broker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(Broker broker) {
+ this.broker = broker;
+ }
+
+ interface QueueLifecyleListener {
+
+ /**
+ * A destination has bean created
+ *
+ * @param destination
+ */
public void onCreate(Queue queue);
-
+
/**
- * A destination has bean destroyed
+ * A destination has bean destroyed
+ *
* @param destination
*/
public void onDestroy(Queue queue);
-
- }
-
- ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
- synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
+ }
+
+ ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
+
+ synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
queueLifecyleListeners.add(listener);
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java Tue Jun 30 17:11:16 2009
@@ -20,6 +20,7 @@
import javax.transaction.xa.XAException;
+import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.util.buffer.Buffer;
@@ -51,7 +52,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
*/
@Override
- public void commit(boolean onePhase) throws XAException, IOException {
+ public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
}
@@ -62,7 +63,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#prepare()
*/
@Override
- public int prepare() throws XAException, IOException {
+ public int prepare(TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
return 0;
}
@@ -73,7 +74,7 @@
* @see org.apache.activemq.apollo.broker.Transaction#rollback()
*/
@Override
- public void rollback() throws XAException, IOException {
+ public void rollback(TransactionListener listener) throws XAException, IOException {
// TODO Auto-generated method stub
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java Tue Jun 30 17:11:16 2009
@@ -23,6 +23,7 @@
import javax.transaction.xa.Xid;
import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
/**
* An implementation of JTA transaction identifier (javax.transaction.xa.Xid).
@@ -244,4 +245,21 @@
in.readFully(data);
setBranchQualifier(data);
}
+
+ /**
+ * @param tid
+ * @return
+ */
+ public static Buffer toBuffer(Xid xid) {
+ XidImpl x = new XidImpl(xid);
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream(x.getMemorySize());
+ try {
+ x.writebody(baos);
+ } catch (IOException e) {
+ //Shouldn't happen:
+ throw new RuntimeException(e);
+ }
+ return baos.toBuffer();
+
+ }
} // class XidImpl
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Tue Jun 30 17:11:16 2009
@@ -229,15 +229,23 @@
}
}
- public boolean remove(Transaction tx, long msgKey) throws IOException {
- Long queueKey = trackingIndex.remove(tx, msgKey);
- if (queueKey != null) {
- QueueRecord qr = queueIndex.remove(tx, queueKey);
+ /**
+ * Removes a queue record returning the corresponding element tracking number.
+ * @param tx The transaction under which to do the removal
+ * @param queueKey The queue key
+ * @return The store tracking.
+ * @throws IOException
+ */
+ public long remove(Transaction tx, long queueKey) throws IOException {
+ QueueRecord qr = queueIndex.remove(tx, queueKey);
+ if(qr != null)
+ {
+ trackingIndex.remove(tx, qr.getMessageKey());
getMetaData(tx).update(-1, -qr.getSize());
tx.store(metaData, META_DATA_MARSHALLER, true);
- return true;
+ return qr.getMessageKey();
}
- return false;
+ return -1;
}
public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Jun 30 17:11:16 2009
@@ -335,11 +335,13 @@
if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
uow = pageFile.tx();
} else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
- rootEntity.setLastUpdate(recoveryPosition);
- uow.commit();
- redoCounter += uowCounter;
- uowCounter = 0;
- uow = null;
+ if (uow != null) {
+ rootEntity.setLastUpdate(recoveryPosition);
+ uow.commit();
+ redoCounter += uowCounter;
+ uowCounter = 0;
+ uow = null;
+ }
} else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
uow.rollback();
uow = null;
@@ -718,8 +720,9 @@
qd.setQueueName(command.getQueueName());
DestinationEntity destination = rootEntity.getDestination(qd);
if (destination != null) {
- if (destination.remove(tx, command.getMessageKey())) {
- rootEntity.removeMessageRef(tx, command.getQueueName(), command.getMessageKey());
+ long messageKey = destination.remove(tx, command.getQueueKey());
+ if (messageKey >= 0) {
+ rootEntity.removeMessageRef(tx, command.getQueueName(), command.getQueueKey());
}
}
}
@@ -940,9 +943,9 @@
addUpdate(bean);
}
- public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
+ public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException {
QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
- bean.setMessageKey(messageKey);
+ bean.setQueueKey(queueKey);
bean.setQueueName(queue.getQueueName());
addUpdate(bean);
}
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto Tue Jun 30 17:11:16 2009
@@ -85,7 +85,7 @@
}
message QueueRemoveMessage {
optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
- optional int64 messageKey=2;
+ optional int64 queueKey=2;
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -37,6 +37,8 @@
private PersistListener persistListener = null;
private final int size;
+ private long tid = -1;
+
public interface PersistListener {
public void onMessagePersisted(OpenWireMessageDelivery delivery);
}
@@ -120,9 +122,15 @@
return record;
}
- public Buffer getTransactionId() {
- // TODO Auto-generated method stub
- return null;
+ public long getTransactionId() {
+ return tid;
+ }
+
+ /**
+ * @param tid
+ */
+ public void setTransactionId(long tid) {
+ this.tid = tid;
}
public void setStoreWireFormat(OpenWireFormat wireFormat) {
@@ -145,4 +153,6 @@
public String toString() {
return message.getMessageId().toString();
}
+
+
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Tue Jun 30 17:11:16 2009
@@ -20,6 +20,11 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
import org.apache.activemq.apollo.WindowLimiter;
import org.apache.activemq.apollo.broker.Broker;
@@ -30,7 +35,9 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.apollo.broker.ProtocolHandler;
import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.broker.Transaction;
import org.apache.activemq.apollo.broker.VirtualHost;
+import org.apache.activemq.apollo.broker.XidImpl;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
@@ -64,6 +71,7 @@
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.BooleanExpression;
@@ -93,6 +101,8 @@
protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
+ protected final ConcurrentHashMap<TransactionId, Transaction> transactions = new ConcurrentHashMap<TransactionId, Transaction>();
+
protected BrokerConnection connection;
private OpenWireFormat wireFormat;
private OpenWireFormat storeWireFormat;
@@ -122,7 +132,7 @@
}
};
connections.put(info.getConnectionId(), connection);
-
+
// Connections have an implicitly created "default" session identified by session id = -1
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
addSession(sessionId, connection);
@@ -228,7 +238,13 @@
OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
md.setStoreWireFormat(storeWireFormat);
- md.setPersistListener(OpenwireProtocolHandler.this);
+ TransactionId tid = info.getTransactionId();
+ if (tid != null) {
+ Transaction t = locateTransaction(tid);
+ md.setTransactionId(t.getTid());
+ } else {
+ md.setPersistListener(OpenwireProtocolHandler.this);
+ }
// Only producers that are not using a window will block,
// and if it blocks.
@@ -242,7 +258,12 @@
while (!producerContext.controller.offer(md, null)) {
producerContext.controller.waitForFlowUnblock();
}
- return null;
+
+ if (tid != null) {
+ return ack(info);
+ } else {
+ return null;
+ }
}
public Response processMessageAck(MessageAck info) throws Exception {
@@ -298,6 +319,11 @@
}
public Response processConnectionControl(ConnectionControl info) throws Exception {
+ if (info != null) {
+ if (info.isFaultTolerant()) {
+ throw new UnsupportedOperationException("Fault Tolerance");
+ }
+ }
return ack(info);
}
@@ -339,34 +365,71 @@
// Methods for transaction management
// /////////////////////////////////////////////////////////////////
public Response processBeginTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
+ TransactionId tid = info.getTransactionId();
+
+ Transaction t = locateTransaction(tid);
+ if (t == null) {
+
+ Buffer xid = null;
+ if (tid.isXATransaction()) {
+ xid = XidImpl.toBuffer((Xid) tid);
+ }
+ t = host.getTransactionManager().createTransaction(xid);
+ transactions.put(tid, t);
+ }
+
+ return ack(info);
}
- public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+ public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception {
+ TransactionId tid = info.getTransactionId();
+ Transaction t = locateTransaction(tid);
+ t.commit(true, new Transaction.TransactionListener()
+ {
+
+ });
+ transactions.remove(tid);
throw new UnsupportedOperationException();
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+ TransactionId tid = info.getTransactionId();
+ Transaction t = locateTransaction(tid);
+ t.commit(false, null);
+ transactions.remove(tid);
throw new UnsupportedOperationException();
}
public Response processEndTransaction(TransactionInfo info) throws Exception {
+ //Shouldn't actually do anything, send by client to ensure that it is
+ //in sync with broker transaction state.
+ //TODO need to investigate whether this should wait for prior transaction
+ //state to flush out?
throw new UnsupportedOperationException();
}
public Response processForgetTransaction(TransactionInfo info) throws Exception {
- throw new UnsupportedOperationException();
+ return processRollbackTransaction(info);
}
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+ TransactionId tid = info.getTransactionId();
+ Transaction t = locateTransaction(tid);
+ t.prepare(null);
throw new UnsupportedOperationException();
}
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+ //TODO
throw new UnsupportedOperationException();
}
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+ TransactionId tid = info.getTransactionId();
+ Transaction t = locateTransaction(tid);
+ t.rollback(null);
+ transactions.remove(tid);
+ //TODO need to respond to this when the rollback completes
throw new UnsupportedOperationException();
}
@@ -393,6 +456,27 @@
};
}
+ private Transaction locateTransaction(TransactionId tid) throws XAException, JMSException {
+ Transaction t;
+
+ if (tid.isLocalTransaction()) {
+ t = transactions.get(tid);
+ } else {
+ t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid));
+ }
+
+ if (t == null) {
+ if (tid.isXATransaction()) {
+ XAException e = new XAException("Transaction '" + tid + "' has not been started.");
+ e.errorCode = XAException.XAER_NOTA;
+ throw e;
+ } else {
+ throw new JMSException("Transaction '" + tid + "' has not been started.");
+ }
+ }
+ return t;
+ }
+
public void start() throws Exception {
}
@@ -594,17 +678,23 @@
}
}
- public void ack(MessageAck info) {
+ public void ack(MessageAck info) throws XAException, JMSException {
// TODO: The pending message queue could probably be optimized to
// avoid having to create a new list here.
int flowCredit = info.getMessageCount();
- if( info.isDeliveredAck() ) {
+ if (info.isDeliveredAck()) {
// This ack is just trying to expand the flow control window size without actually
// acking the message. Keep track of how many limiter credits we borrow since they need
// to get paid back with real acks later.
borrowedLimterCredits += flowCredit;
limiter.onProtocolCredit(flowCredit);
- } else if(info.isStandardAck()) {
+ } else if (info.isStandardAck()) {
+ TransactionId tid = info.getTransactionId();
+ Transaction transaction = null;
+ if (tid != null) {
+ transaction = locateTransaction(tid);
+ }
+
LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
synchronized (this) {
MessageId id = info.getLastMessageId();
@@ -619,27 +709,35 @@
}
}
}
-
+
// Did we have DeliveredAcks previously sent? Then the
// the flow window has already been credited. We need to
// pay back the borrowed limiter credits before giving
// credits directly to the limiter.
- if(borrowedLimterCredits>0) {
- if( flowCredit > borrowedLimterCredits ) {
+ if (borrowedLimterCredits > 0) {
+ if (flowCredit > borrowedLimterCredits) {
flowCredit -= borrowedLimterCredits;
- borrowedLimterCredits=0;
+ borrowedLimterCredits = 0;
} else {
borrowedLimterCredits -= flowCredit;
- flowCredit=0;
+ flowCredit = 0;
}
}
limiter.onProtocolCredit(flowCredit);
}
- // Delete outside of synchronization on queue to avoid contention
- // with enqueueing threads.
- for (SubscriptionDelivery<MessageDelivery> callback : acked) {
- callback.acknowledge();
+ if (transaction == null) {
+ // Delete outside of synchronization on queue to avoid contention
+ // with enqueueing threads.
+ for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+ callback.acknowledge();
+ }
+ } else {
+ // Delete outside of synchronization on queue to avoid contention
+ // with enqueueing threads.
+ for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+ transaction.addAck(callback);
+ }
}
}
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Tue Jun 30 17:11:16 2009
@@ -70,7 +70,7 @@
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
- private static final boolean PERSISTENT = false;
+ private static final boolean PERSISTENT = true;
private static final boolean PURGE_STORE = true;
// Producers send sync and operations are never canceled.
private static final boolean TEST_MAX_STORE_LATENCY = false;
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Tue Jun 30 17:11:16 2009
@@ -124,6 +124,22 @@
expirator.elementAdded(qe);
}
+ public void remove(long sequence) {
+ QueueElement<V> qe = queue.lower(sequence, true);
+ if (qe == null) {
+ return;
+ } else if (qe.getSequence() == sequence) {
+ qe.acknowledge();
+ }
+ //Otherwise if the element is paged out, create a new
+ //holder and mark it for deletion
+ else if (persistencePolicy.isPageOutPlaceHolders()) {
+ //FIXME, need to track this delete otherwise an in flight message restore
+ //might load this element back in.
+ getQueueStore().deleteQueueElement(new QueueElement<V>(null, sequence, this));
+ }
+ }
+
/**
* @return True if the queue needs dispatching.
*/
@@ -865,9 +881,11 @@
}
if (saved) {
- queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
+ queue.getQueueStore().deleteQueueElement(this);
}
+ //FIXME need to track deletions when paging is enabled
+ //otherwise an in process restore might reload the element
elem = null;
unload(null);
@@ -1157,8 +1175,11 @@
return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.Subscription.SubscriptionDelivery#getSourceQueueRemovalKey()
+ /*
+ * (non-Javadoc)
+ *
+ * @seeorg.apache.activemq.queue.Subscription.SubscriptionDelivery#
+ * getSourceQueueRemovalKey()
*/
public long getSourceQueueRemovalKey() {
return sequence;
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Tue Jun 30 17:11:16 2009
@@ -96,6 +96,10 @@
};
}
+ public synchronized void remove(long key) {
+ queue.remove(key);
+ }
+
/*
* (non-Javadoc)
*
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Tue Jun 30 17:11:16 2009
@@ -93,5 +93,11 @@
* queue shutdown will proceed asynchronously.
*/
public void shutdown(boolean sync);
+
+ /**
+ * Removes the element with the given sequence from this queue
+ * @param key The sequence key.
+ */
+ public void remove(long sequence);
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Tue Jun 30 17:11:16 2009
@@ -71,10 +71,9 @@
return rc;
}
-
-
+
abstract public IQueue<K, V> createPartition(int partitionKey);
-
+
/*
* (non-Javadoc)
*
@@ -92,7 +91,7 @@
}
}
}
-
+
public int getEnqueuedCount() {
checkShutdown();
synchronized (this) {
@@ -118,7 +117,6 @@
}
}
-
public void setStore(QueueStore<K, V> store) {
this.store = store;
}
@@ -149,7 +147,6 @@
persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
}
}
-
public synchronized void start() {
if (!started) {
@@ -172,9 +169,8 @@
}
}
-
public void shutdown(boolean sync) {
- Collection <IQueue<K, V>> partitions = null;
+ Collection<IQueue<K, V>> partitions = null;
synchronized (this) {
if (!shutdown) {
shutdown = true;
@@ -225,7 +221,6 @@
return partitionMapper;
}
-
public void add(V value, ISourceController<?> source) {
int partitionKey = partitionMapper.map(value);
getPartition(partitionKey).add(value, source);
@@ -235,7 +230,7 @@
int partitionKey = partitionMapper.map(value);
return getPartition(partitionKey).offer(value, source);
}
-
+
public void setKeyMapper(Mapper<K, V> keyMapper) {
this.keyMapper = keyMapper;
}
@@ -254,9 +249,8 @@
}
}
}
-
- protected Collection<IQueue<K, V>> getPartitions()
- {
+
+ protected Collection<IQueue<K, V>> getPartitions() {
return partitions.values();
}
@@ -265,8 +259,10 @@
throw new IllegalStateException(this + " is shutdown");
}
}
-
- /* (non-Javadoc)
+
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.queue.IPollableFlowSource#isDispatchReady()
*/
public boolean isDispatchReady() {
@@ -274,7 +270,9 @@
throw new UnsupportedOperationException();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.queue.IPollableFlowSource#poll()
*/
public V poll() {
@@ -282,7 +280,9 @@
throw new UnsupportedOperationException();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.queue.IPollableFlowSource#pollingDispatch()
*/
public boolean pollingDispatch() {
@@ -290,12 +290,19 @@
throw new UnsupportedOperationException();
}
- /* (non-Javadoc)
- * @see org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted(org.apache.activemq.flow.ISourceController, java.lang.Object)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted
+ * (org.apache.activemq.flow.ISourceController, java.lang.Object)
*/
public void flowElemAccepted(ISourceController<V> source, V elem) {
// TODO Remove
throw new UnsupportedOperationException();
-
+ }
+
+ public void remove(long key) {
+ throw new UnsupportedOperationException();
}
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java Tue Jun 30 17:11:16 2009
@@ -95,12 +95,10 @@
/**
* Asynchronously deletes an element from the store.
*
- * @param descriptor
- * The queue descriptor
* @param element
* The element to delete.
*/
- public void deleteQueueElement(QueueDescriptor descriptor, V element);
+ public void deleteQueueElement(SaveableQueueElement<V> elem);
/**
* Asynchronously saves the given element to the store
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Tue Jun 30 17:11:16 2009
@@ -351,6 +351,12 @@
}
}
+ public void remove(long key) {
+ synchronized (mutex) {
+ queue.remove(key);
+ }
+ }
+
public boolean offer(V elem, ISourceController<?> source) {
synchronized (mutex) {
@@ -542,11 +548,10 @@
} else {
cursor.reset(queue.getFirstSequence());
}
-
+
if (DEBUG)
System.out.println("Starting " + this + " at " + cursor);
-
updateDispatchList();
}
}
@@ -701,17 +706,16 @@
SubscriptionDelivery<V> callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
// If the sub is a browser don't pass it a callback since it does not need to
// delete messages
- if( sub.isBrowser() ) {
+ if (sub.isBrowser()) {
callback = null;
}
-
+
// See if the sink has room:
qe.setAcquired(sub);
if (sub.offer(qe.elem, this, callback)) {
if (DEBUG)
System.out.println("Dispatched " + qe.getElement() + " to " + this);
-
if (!sub.isBrowser()) {
// If remove on dispatch acknowledge now:
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Tue Jun 30 17:11:16 2009
@@ -172,6 +172,11 @@
return sinkController.offer(elem, source);
}
+
+ public void remove(long key) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Called when the controller accepts a message for this queue.
*/
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Tue Jun 30 17:11:16 2009
@@ -208,7 +208,7 @@
}
- public final void deleteQueueElement(QueueDescriptor descriptor, Message elem) {
+ public final void deleteQueueElement(SaveableQueueElement<Message> elem) {
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -23,7 +23,6 @@
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
public class StompMessageDelivery extends BrokerMessageDelivery {
@@ -38,6 +37,8 @@
private PersistListener persistListener = null;
private long tte = Long.MIN_VALUE;
+ private long tid = -1;
+
public interface PersistListener {
public void onMessagePersisted(StompMessageDelivery delivery);
}
@@ -152,9 +153,12 @@
return record;
}
- public Buffer getTransactionId() {
- // TODO Auto-generated method stub
- return null;
+ public long getTransactionId() {
+ return tid;
+ }
+
+ public void setTransactionId(long tid) {
+ this.tid = tid;
}
public MessageEvaluationContext createMessageEvaluationContext() {
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Tue Jun 30 17:11:16 2009
@@ -639,7 +639,13 @@
*/
public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
- public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
+ /**
+ * Deletes an element from a queue.
+ * @param queue The queue
+ * @param queueKey The element's queue key as given by {@link QueueRecord#setQueueKey(Long)}
+ * @throws KeyNotFoundException
+ */
+ public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException;
public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Jun 30 17:11:16 2009
@@ -98,8 +98,6 @@
QueueDescriptor descriptor;
TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>(Comparators.LONG_COMPARATOR);
- // Maps tracking to sequence number:
- HashMap<Long, Long> trackingMap = new HashMap<Long, Long>();
int count = 0;
long size = 0;
HashMap<QueueDescriptor, StoredQueue> partitions;
@@ -111,20 +109,18 @@
public void add(QueueRecord record) {
records.put(record.getQueueKey(), record);
- trackingMap.put(record.getMessageKey(), record.getQueueKey());
count++;
size += record.getSize();
}
- public boolean remove(Long msgKey) {
- Long sequenceKey = trackingMap.remove(msgKey);
- if (sequenceKey != null) {
- QueueRecord record = records.remove(sequenceKey);
+ public long remove(Long queueKey) {
+ QueueRecord record = records.remove(queueKey);
+ if (record != null) {
count--;
size -= record.getSize();
- return true;
+ return record.getMessageKey();
}
- return false;
+ return -1;
}
public Iterator<QueueRecord> list(Long firstQueueKey, long maxSequence, int max) {
@@ -230,11 +226,11 @@
static private class RemoveOp {
QueueDescriptor queue;
- Long messageKey;
+ Long queueKey;
- public RemoveOp(QueueDescriptor queue, Long messageKey) {
+ public RemoveOp(QueueDescriptor queue, Long queueKey) {
this.queue = queue;
- this.messageKey = messageKey;
+ this.queueKey = queueKey;
}
}
@@ -244,7 +240,7 @@
public void commit(MemorySession session) throws KeyNotFoundException {
for (RemoveOp op : removes) {
- session.queueRemoveMessage(op.queue, op.messageKey);
+ session.queueRemoveMessage(op.queue, op.queueKey);
}
}
@@ -258,8 +254,8 @@
adds.add(messageKey);
}
- public void removeMessage(QueueDescriptor queue, Long messageKey) {
- removes.add(new RemoveOp(queue, messageKey));
+ public void removeMessage(QueueDescriptor queue, Long queueKey) {
+ removes.add(new RemoveOp(queue, queueKey));
}
}
@@ -517,8 +513,10 @@
}
}
- public void queueRemoveMessage(QueueDescriptor queue, Long msgKey) throws KeyNotFoundException {
- if (get(queues, queue.getQueueName()).remove(msgKey)) {
+ public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException {
+ long msgKey = get(queues, queue.getQueueName()).remove(queueKey);
+ if(msgKey >= 0)
+ {
deleteMessageReference(msgKey);
}
}
Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Tue Jun 30 17:11:16 2009
@@ -324,7 +324,7 @@
for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
QueueRecord r = iterator.next();
records.add(session.messageGetRecord(r.getMessageKey()));
- session.queueRemoveMessage(queueId, r.messageKey);
+ session.queueRemoveMessage(queueId, r.queueKey);
}
}
};
Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java Tue Jun 30 17:11:16 2009
@@ -57,6 +57,13 @@
prev.linkAfter(node);
}
}
+
+ /**
+ * @param sequence The sequence number of the element to get.
+ */
+ public T get(long sequence) {
+ return index.get(sequence);
+ }
public T lower(long sequence, boolean inclusive) {
Entry<Long, T> lower = index.floorEntry(sequence);
@@ -154,4 +161,6 @@
}
return rc;
}
+
+
}