You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/28 21:37:07 UTC
svn commit: r789143 [1/2] - in /activemq/sandbox/activemq-flow:
activemq-broker/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-openwire/src/main/java/org/a...
Author: cmacnaug
Date: Sun Jun 28 19:37:06 2009
New Revision: 789143
URL: http://svn.apache.org/viewvc?rev=789143&view=rev
Log:
Adding the beginnings of Transaction support for the broker.
Also added support for Maps in avtivemq-kaha store implementation.
Not yet functional or tested. This commit adds in the TransactionManager and related transaction objects, and the backing queue/store logic. Still need to test this and implement actual commit and rollback.
Added:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java (with props)
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (with props)
Modified:
activemq/sandbox/activemq-flow/activemq-broker/pom.xml
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/pom.xml?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/pom.xml Sun Jun 28 19:37:06 2009
@@ -226,7 +226,6 @@
</dependency>
</dependencies>
</profile>
- <!--
<profile>
<id>default-tools.jar</id>
<activation>
@@ -246,7 +245,6 @@
</dependency>
</dependencies>
</profile>
- -->
</profiles>
</project>
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Sun Jun 28 19:37:06 2009
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,8 @@
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.queue.RestoreListener;
import org.apache.activemq.queue.RestoredElement;
@@ -67,8 +70,6 @@
private AtomicBoolean running = new AtomicBoolean(false);
private DatabaseListener listener;
- private HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
-
private final LinkedNodeList<OperationBase> opQueue;
private AtomicBoolean notify = new AtomicBoolean(false);
private Semaphore opsReady = new Semaphore(0);
@@ -92,6 +93,23 @@
public void onDatabaseException(IOException ioe);
}
+ public static interface MessageRecordMarshaller<V> {
+ MessageRecord marshal(V element);
+
+ /**
+ * Called when a queue element is recovered from the store for a
+ * particular queue.
+ *
+ * @param mRecord
+ * The message record
+ * @param queue
+ * The queue that the element is being restored to (or null
+ * if not being restored for a queue)
+ * @return
+ */
+ V unMarshall(MessageRecord mRecord, QueueDescriptor queue);
+ }
+
public BrokerDatabase(Store store) {
this.store = store;
this.opQueue = new LinkedNodeList<OperationBase>();
@@ -169,6 +187,16 @@
}
}
+ /**
+ * A blocking operation that lists all queues of a given type:
+ *
+ * @param type
+ * The queue type
+ * @return A list of queues.
+ *
+ * @throws Exception
+ * If there was an error listing the queues.
+ */
public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
@@ -180,6 +208,42 @@
}
/**
+ * A blocking operation that lists all entries in the specified map
+ *
+ * @param map
+ * The map to list
+ * @return A list of map entries
+ *
+ * @throws Exception
+ * If there was an error listing the queues.
+ */
+ public Map<AsciiBuffer, Buffer> listMapEntries(final AsciiBuffer map) throws Exception {
+ return store.execute(new Callback<Map<AsciiBuffer, Buffer>, Exception>() {
+
+ public Map<AsciiBuffer, Buffer> execute(Session session) throws Exception {
+ HashMap<AsciiBuffer, Buffer> ret = new HashMap<AsciiBuffer, Buffer>();
+ Iterator<AsciiBuffer> keys = session.mapEntryListKeys(map, null, -1);
+ while (keys.hasNext()) {
+ AsciiBuffer key = keys.next();
+ ret.put(key, session.mapEntryGet(map, key));
+ }
+
+ return ret;
+ }
+
+ }, null);
+ }
+
+ /**
+ * @param map The name of the map to update.
+ * @param key The key in the map to update.
+ * @param value The value to insert.
+ */
+ public OperationContext updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+ return add(new MapUpdateOperation(map, key, value), null, false) ;
+ }
+
+ /**
* Executes user supplied {@link Operation}. If the {@link Operation} does
* not throw any Exceptions, all updates to the store are committed,
* otherwise they are rolled back. Any exceptions thrown by the
@@ -481,20 +545,20 @@
* @return The {@link OperationContext} associated with the operation
*/
public OperationContext saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
- return add(new AddMessageOperation(queueElement), source, delayable);
+ return add(new AddMessageOperation(queueElement), source, !delayable);
}
/**
* Deletes the given message from the store for the given queue.
*
- * @param delivery
- * The delivery.
+ * @param storeTracking
+ * The tracking number of the element being deleted
* @param queue
* The queue.
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext deleteMessage(MessageDelivery delivery, QueueDescriptor queue) {
- return add(new DeleteMessageOperation(delivery.getStoreTracking(), queue), null, false);
+ public OperationContext deleteQueueElement(long storeTracking, QueueDescriptor queue) {
+ return add(new DeleteOperation(storeTracking, queue), null, false);
}
/**
@@ -519,8 +583,9 @@
* The listener to which messags should be passed.
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext restoreMessages(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
- return add(new RestoreMessageOperation(queue, recordsOnly, first, maxCount, maxSequence, listener), null, true);
+ public <T> OperationContext restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
+ MessageRecordMarshaller<T> marshaller) {
+ return add(new RestoreElementsOperation<T>(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true);
}
private void onDatabaseException(IOException ioe) {
@@ -783,18 +848,17 @@
}
}
- private class DeleteMessageOperation extends OperationBase {
+ private class DeleteOperation extends OperationBase {
private final long storeTracking;
private QueueDescriptor queue;
- public DeleteMessageOperation(long tracking, QueueDescriptor queue) {
+ public DeleteOperation(long tracking, QueueDescriptor queue) {
this.storeTracking = tracking;
this.queue = queue;
}
@Override
public int getLimiterSize() {
- // Might consider bumping this up to avoid too much accumulation?
return BASE_MEM_SIZE + 8;
}
@@ -821,22 +885,24 @@
}
}
- private class RestoreMessageOperation extends OperationBase {
+ private class RestoreElementsOperation<V> extends OperationBase {
private QueueDescriptor queue;
private long firstKey;
private int maxRecords;
private long maxSequence;
private boolean recordsOnly;
- private RestoreListener<MessageDelivery> listener;
- private Collection<RestoredElement<MessageDelivery>> msgs = null;
+ private RestoreListener<V> listener;
+ private Collection<RestoredElement<V>> msgs = null;
+ private MessageRecordMarshaller<V> marshaller;
- RestoreMessageOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
+ RestoreElementsOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<V> listener, MessageRecordMarshaller<V> marshaller) {
this.queue = queue;
this.recordsOnly = recordsOnly;
this.firstKey = firstKey;
this.maxRecords = maxRecords;
this.maxSequence = maxSequence;
this.listener = listener;
+ this.marshaller = marshaller;
}
@Override
@@ -850,9 +916,9 @@
Iterator<QueueRecord> records = null;
try {
records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
- msgs = new LinkedList<RestoredElement<MessageDelivery>>();
+ msgs = new LinkedList<RestoredElement<V>>();
} catch (KeyNotFoundException e) {
- msgs = new ArrayList<RestoredElement<MessageDelivery>>(0);
+ msgs = new ArrayList<RestoredElement<V>>(0);
return;
}
@@ -863,9 +929,10 @@
}
while (qRecord != null) {
- RestoredMessageImpl rm = new RestoredMessageImpl();
+ RestoredElementImpl<V> rm = new RestoredElementImpl<V>();
// TODO should update jms redelivery here.
rm.qRecord = qRecord;
+ rm.queue = queue;
count++;
// Set the next sequence number:
@@ -890,15 +957,7 @@
if (!recordsOnly) {
try {
rm.mRecord = session.messageGetRecord(rm.qRecord.getMessageKey());
- rm.handler = protocolHandlers.get(rm.mRecord.getEncoding().toString());
- if (rm.handler == null) {
- try {
- rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.getEncoding().toString());
- protocolHandlers.put(rm.mRecord.getEncoding().toString(), rm.handler);
- } catch (Throwable thrown) {
- throw new RuntimeException("Unknown message format" + rm.mRecord.getEncoding().toString(), thrown);
- }
- }
+ rm.marshaller = marshaller;
msgs.add(rm);
} catch (KeyNotFoundException shouldNotHappen) {
shouldNotHappen.printStackTrace();
@@ -1037,20 +1096,51 @@
}
}
- private class RestoredMessageImpl implements RestoredElement<MessageDelivery> {
+ private class MapUpdateOperation extends OperationBase {
+ final AsciiBuffer map;
+ final AsciiBuffer key;
+ final Buffer value;
+
+ MapUpdateOperation(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
+ this.map = mapName;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public int getLimiterSize() {
+ return BASE_MEM_SIZE + map.length + key.length + value.length;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.BrokerDatabase.OperationBase#doExcecute
+ * (org.apache.activemq.broker.store.Store.Session)
+ */
+ @Override
+ protected void doExcecute(Session session) {
+ try {
+ session.mapEntryPut(map, key, value);
+ } catch (KeyNotFoundException e) {
+ throw new Store.FatalStoreException(e);
+ }
+ }
+ }
+
+ private class RestoredElementImpl<T> implements RestoredElement<T> {
QueueRecord qRecord;
+ QueueDescriptor queue;
MessageRecord mRecord;
- ProtocolHandler handler;
+ MessageRecordMarshaller<T> marshaller;
long nextSequence;
- public MessageDelivery getElement() throws IOException {
+ public T getElement() throws IOException {
if (mRecord == null) {
return null;
}
-
- BrokerMessageDelivery delivery = handler.createMessageDelivery(mRecord);
- delivery.setFromDatabase(BrokerDatabase.this, mRecord);
- return delivery;
+ return marshaller.unMarshall(mRecord, queue);
}
/*
@@ -1092,7 +1182,6 @@
* org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize()
*/
public int getElementSize() {
- // TODO Auto-generated method stub
return qRecord.getSize();
}
@@ -1105,11 +1194,9 @@
public long getExpiration() {
return qRecord.getTte();
}
-
}
public long allocateStoreTracking() {
- // TODO Auto-generated method stub
return store.allocateStoreTracking();
}
@@ -1121,8 +1208,72 @@
this.dispatcher = dispatcher;
}
- public Store getStore() {
- return store;
- }
+ public Store getStore() {
+ return store;
+ }
+
+ /**
+ * @param sqe
+ * @param source
+ * @param delayable
+ */
+ public <T> OperationContext saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+ return add(new AddElementOpOperation<T>(sqe, delayable, marshaller), source, !delayable);
+ }
+
+ private class AddElementOpOperation<T> extends OperationBase {
+
+ private final SaveableQueueElement<T> op;
+ private MessageRecord record;
+ private boolean delayable;
+ private final MessageRecordMarshaller<T> marshaller;
+
+ public AddElementOpOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+ this.op = op;
+ this.delayable = delayable;
+ if (!delayable) {
+ record = marshaller.marshal(op.getElement());
+ this.marshaller = null;
+ } else {
+ this.marshaller = marshaller;
+ }
+ }
+
+ public boolean isDelayable() {
+ return delayable;
+ }
+
+ @Override
+ public int getLimiterSize() {
+ return record.getSize() + BASE_MEM_SIZE + 40;
+ }
+
+ @Override
+ protected void doExcecute(Session session) {
+
+ if (record == null) {
+ record = marshaller.marshal(op.getElement());
+ }
+ session.messageAdd(record);
+ try {
+ QueueRecord queueRecord = new QueueRecord();
+ queueRecord.setAttachment(null);
+ queueRecord.setMessageKey(record.getKey());
+ queueRecord.setSize(record.getSize());
+ queueRecord.setQueueKey(op.getSequenceNumber());
+ session.queueAddMessage(op.getQueueDescriptor(), queueRecord);
+ } catch (KeyNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onCommit() {
+ }
+
+ public String toString() {
+ return "AddTxOpOperation " + record.getKey() + super.toString();
+ }
+ }
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -127,7 +127,7 @@
}
if (!deleted) {
- store.deleteMessage(this, queue);
+ store.deleteQueueElement(getStoreTracking(), queue);
}
if (firePersistListener) {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Sun Jun 28 19:37:06 2009
@@ -21,13 +21,14 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.IPartitionedQueue;
import org.apache.activemq.queue.IQueue;
@@ -53,6 +54,48 @@
private BrokerDatabase database;
private IDispatcher dispatcher;
+ private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+ private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+ * #marshal(java.lang.Object)
+ */
+ public MessageRecord marshal(MessageDelivery element) {
+ return element.createMessageRecord();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+ * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
+ */
+ public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
+ ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+ if (handler == null) {
+ try {
+ handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
+ protocolHandlers.put(record.getEncoding().toString(), handler);
+ } catch (Throwable thrown) {
+ throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+ }
+ }
+ try {
+ return handler.createMessageDelivery(record);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ };
+
+ final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
+ return MESSAGE_MARSHALLER;
+ }
+
private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
public Long map(MessageDelivery element) {
return element.getExpiration();
@@ -63,14 +106,35 @@
public Integer map(MessageDelivery element) {
return element.getFlowLimiterSize();
}
+ };
+
+ public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ return element.getPriority();
+ }
};
- private final short PARTITION_TYPE = 0;
- private final short SHARED_QUEUE_TYPE = 1;
- private final short DURABLE_QUEUE_TYPE = 2;
+ static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+ public Long map(MessageDelivery element) {
+ return element.getStoreTracking();
+ }
+ };
+
+ static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId().hashCode() % 10);
+ }
+ };
+
+ public static final short SUBPARTITION_TYPE = 0;
+ public static final short SHARED_QUEUE_TYPE = 1;
+ public static final short DURABLE_QUEUE_TYPE = 2;
+ public static short TRANSACTION_QUEUE_TYPE = 3;
private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
- private final HashMap<String, ExclusivePersistentQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, ExclusivePersistentQueue<Long, MessageDelivery>>();
+ private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
private Mapper<Integer, MessageDelivery> partitionMapper;
@@ -79,7 +143,7 @@
// Be default we don't page out elements to disk.
private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
//private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
-
+
private static long dynamicQueueCounter = 0;
private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
@@ -122,7 +186,7 @@
// Be default we don't page out elements to disk.
//private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
-
+
private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
@@ -158,26 +222,6 @@
}
};
- public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- return element.getPriority();
- }
- };
-
- static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
- public Long map(MessageDelivery element) {
- return element.getStoreTracking();
- }
- };
-
- static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- // we modulo 10 to have at most 10 partitions which the producers
- // gets split across.
- return (int) (element.getProducerId().hashCode() % 10);
- }
- };
-
public void setDatabase(BrokerDatabase database) {
this.database = database;
}
@@ -201,13 +245,13 @@
results = database.listQueues(DURABLE_QUEUE_TYPE);
while (results.hasNext()) {
QueueQueryResult loaded = results.next();
- ExclusivePersistentQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
+ IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
}
}
-
+
private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
IQueue<Long, MessageDelivery> queue;
@@ -237,7 +281,7 @@
}
- private ExclusivePersistentQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
+ private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
@@ -258,6 +302,11 @@
}
+ public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
+ //TODO
+ return null;
+ }
+
public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
synchronized (this) {
Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
@@ -267,8 +316,8 @@
}
}
- public ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueue(String name) {
- ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+ public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
+ IQueue<Long, MessageDelivery> queue = null;
synchronized (this) {
queue = durableQueues.get(name);
if (queue == null) {
@@ -283,12 +332,10 @@
return queue;
}
-
-
public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
synchronized (this) {
- String name = "temp:"+(dynamicQueueCounter++);
+ String name = "temp:" + (dynamicQueueCounter++);
queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
queue.initialize(0, 0, 0, 0);
@@ -296,12 +343,11 @@
}
return queue;
}
-
-
- public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
+
+ public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
synchronized (this) {
- Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();
- ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>> ret = new ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>>(c.size());
+ Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
+ ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
ret.addAll(c);
return ret;
}
@@ -397,7 +443,7 @@
throw new IllegalArgumentException("Unknown queue type" + type);
}
}
- ret.getDescriptor().setApplicationType(PARTITION_TYPE);
+ ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
ret.setDispatcher(dispatcher);
ret.setStore(this);
ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
@@ -419,8 +465,8 @@
}
public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
- org.apache.activemq.queue.RestoreListener<MessageDelivery> listener) {
- database.restoreMessages(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener);
+ RestoreListener<MessageDelivery> listener) {
+ database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
}
public final void addQueue(QueueDescriptor queue) {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Sun Jun 28 19:37:06 2009
@@ -22,19 +22,19 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.Subscription;
public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
- private final ExclusivePersistentQueue<Long, MessageDelivery> queue;
+ private final IQueue<Long, MessageDelivery> queue;
private final VirtualHost host;
private final Destination destination;
private Subscription<MessageDelivery> connectedSub;
boolean started = false;
BooleanExpression selector;
- DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, ExclusivePersistentQueue<Long, MessageDelivery> queue) {
+ DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, IQueue<Long, MessageDelivery> queue) {
this.host = host;
this.queue = queue;
this.destination = destination;
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.queue.IQueue;
+
+/**
+ * LocalTransaction
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+public class LocalTransaction extends Transaction {
+
+ LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+ super(manager, tid, opQueue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+ */
+ @Override
+ public void commit(boolean onePhase) throws XAException, IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#prepare()
+ */
+ @Override
+ public int prepare() throws XAException, IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+ */
+ @Override
+ public void rollback() throws XAException, IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.apollo.broker.Transaction#getType()
+ */
+ @Override
+ public byte getType() {
+ return TYPE_LOCAL;
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -124,4 +124,5 @@
* @return
*/
public MessageEvaluationContext createMessageEvaluationContext();
+
}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Sun Jun 28 19:37:06 2009
@@ -26,7 +26,7 @@
/**
* @author cmacnaug
- *
+ *
*/
public class MessageDeliveryWrapper implements MessageDelivery {
@@ -151,9 +151,10 @@
public boolean isPersistent() {
return delegate.isPersistent();
}
-
-
- /** (non-Javadoc)
+
+ /**
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.apollo.broker.MessageDelivery#getExpiration()
*/
public long getExpiration() {
@@ -203,5 +204,4 @@
MessageDeliveryWrapper(MessageDelivery delivery) {
delegate = delivery;
}
-
}
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ *
+ * @version $Revision: 1.5 $
+ */
+public abstract class Transaction {
+
+ public static final byte START_STATE = 0; // can go to: 1,2,3
+ public static final byte IN_USE_STATE = 1; // can go to: 2,3
+ public static final byte PREPARED_STATE = 2; // can go to: 3
+ public static final byte FINISHED_STATE = 3;
+
+ static final byte TYPE_LOCAL = 0;
+ static final byte TYPE_XA = 1;
+
+ private byte state = START_STATE;
+ private final TransactionManager manager;
+ private final long tid;
+ private final IQueue<Long, TxOp> opQueue;
+
+ Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+ this.manager = manager;
+ this.opQueue = opQueue;
+ this.tid = tid;
+ }
+
+ /**
+ * @return the unique identifier used by the {@link TransactionManager} to
+ * identify this {@link Transaction}
+ *
+ */
+ public long getTid() {
+ return tid;
+ }
+
+ public AsciiBuffer getBackingQueueName() {
+ return opQueue.getDescriptor().getQueueName();
+ }
+
+ /**
+ * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
+ */
+ public abstract byte getType();
+
+ public void addMessage(BrokerMessageDelivery m, ISourceController<?> source) {
+
+ synchronized (this) {
+ switch (state) {
+ case START_STATE:
+ case IN_USE_STATE:
+ opQueue.add(new TxMessage(m, this), source);
+ break;
+ default: {
+ throw new IllegalStateException("Can't add message to finished or prepared transaction");
+ }
+ }
+ }
+ }
+
+ public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
+ synchronized (this) {
+ switch (state) {
+ case START_STATE:
+ case IN_USE_STATE:
+ IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
+ //Queue could be null if it was just deleted:
+ if (target != null) {
+ opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), this), null);
+ }
+ break;
+ default: {
+ throw new IllegalStateException("Can't add message to finished or prepared transaction");
+ }
+ }
+ }
+ }
+
+ public byte getState() {
+ return state;
+ }
+
+ public void setState(byte state) {
+ this.state = state;
+ }
+
+ public void prePrepare() throws Exception {
+
+ // Is it ok to call prepare now given the state of the
+ // transaction?
+ switch (state) {
+ case START_STATE:
+ case IN_USE_STATE:
+ break;
+ default:
+ XAException xae = new XAException("Prepare cannot be called now.");
+ xae.errorCode = XAException.XAER_PROTO;
+ throw xae;
+ }
+
+ //TODO:
+ }
+
+ protected void fireAfterCommit() throws Exception {
+
+ //TODO
+ }
+
+ public void fireAfterRollback() throws Exception {
+ //TODO
+ }
+
+ public String toString() {
+ return super.toString() + "[queue=" + opQueue + "]";
+ }
+
+ public abstract void commit(boolean onePhase) throws XAException, IOException;
+
+ public abstract void rollback() throws XAException, IOException;
+
+ public abstract int prepare() throws XAException, IOException;
+
+ public boolean isPrepared() {
+ return getState() == PREPARED_STATE;
+ }
+
+ public long size() {
+ return opQueue.getEnqueuedCount();
+ }
+
+ interface TxOp {
+ public static final short TYPE_MESSAGE = 0;
+ public static final short TYPE_ACK = 1;
+
+ public short getType();
+
+ public <T> T asType(Class<T> type);
+
+ public void onRollback();
+
+ public void onCommit();
+
+ public int getLimiterSize();
+
+ public boolean isFromStore();
+
+ public long getStoreTracking();
+
+ public MessageRecord createMessageRecord();
+
+ /**
+ * @return
+ */
+ public boolean isPersistent();
+
+ /**
+ * @return
+ */
+ public Long getExpiration();
+
+ public int getPriority();
+ }
+
+ static class TxMessage implements TxOp {
+ MessageDelivery message;
+ Transaction tx;
+ private boolean fromStore;
+
+ /**
+ * @param m
+ * @param transaction
+ */
+ public TxMessage(MessageDelivery m, Transaction tx) {
+ message = m;
+ this.tx = tx;
+ }
+
+ public <T> T asType(Class<T> type) {
+ if (type == TxMessage.class) {
+ return type.cast(this);
+ } else {
+ return null;
+ }
+ }
+
+ public final short getType() {
+ return TYPE_MESSAGE;
+ }
+
+ public final int getLimiterSize() {
+ // TODO Auto-generated method stub
+ return message.getFlowLimiterSize();
+ }
+
+ public final void onCommit() {
+
+ }
+
+ public final void onRollback() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public final boolean isFromStore() {
+ return fromStore;
+ }
+
+ public final MessageRecord createMessageRecord() {
+ return message.createMessageRecord();
+ }
+
+ public final long getStoreTracking() {
+ return message.getStoreTracking();
+ }
+
+ public final boolean isPersistent() {
+ return message.isPersistent();
+ }
+
+ public final Long getExpiration() {
+ return message.getExpiration();
+ }
+
+ public final int getPriority() {
+ return message.getPriority();
+ }
+ }
+
+ static class TxAck implements TxOp {
+ public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
+ Transaction tx;
+ IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
+ long queueSequence; //Sequence number of the element on the queue from which to delete.
+ long storeTracking; //Store tracking of this delete op.
+ private boolean fromStore;
+ private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
+
+ TxAck(IQueue<Long, ?> queue, long storeTracking, Transaction tx) {
+ this.queue = queue;
+ this.storeTracking = storeTracking;
+ this.tx = tx;
+ }
+
+ public final short getType() {
+ return TYPE_ACK;
+ }
+
+ public <T> T asType(Class<T> type) {
+ if (type == TxAck.class) {
+ return type.cast(this);
+ } else {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
+ */
+ public final void onCommit() {
+ //TODO
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+ */
+ public final int getLimiterSize() {
+ return MEM_SIZE;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+ */
+ public final void onRollback() {
+ // TODO unaquire the element.
+ }
+
+ public final boolean isFromStore() {
+ return fromStore;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
+ */
+ public final long getStoreTracking() {
+ return storeTracking;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
+ * ()
+ */
+ public final MessageRecord createMessageRecord() {
+ MessageRecord ret = new MessageRecord();
+ ret.setEncoding(TxAck.ENCODING);
+ ret.setKey(storeTracking);
+ ret.setSize(MEM_SIZE);
+ ret.setBuffer(new Buffer(toBytes().getData()));
+ return null;
+ }
+
+ private final ByteSequence toBytes() {
+ AsciiBuffer queueName = queue.getDescriptor().getQueueName();
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
+ baos.writeShort(queueName.length);
+ baos.write(queueName.data, queueName.offset, queueName.length);
+ baos.writeLong(queueSequence);
+ return baos.toByteSequence();
+ }
+
+ private final void fromBytes(byte[] bytes) {
+ DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
+ byte[] queueBytes = new byte[baos.readShort()];
+ baos.readFully(queueBytes);
+ AsciiBuffer queueName = new AsciiBuffer(queueBytes);
+ queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+
+ }
+
+ public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
+ TxAck ret = new TxAck(null, record.getKey(), tx);
+ ret.fromBytes(record.getBuffer().getData());
+ return ret;
+ }
+
+ public final boolean isPersistent() {
+ //TODO This could probably be relaxed when the ack is for non persistent
+ //elements
+ return true;
+ }
+
+ public final Long getExpiration() {
+ return -1L;
+ }
+
+ public final int getPriority() {
+ return 0;
+ }
+ }
+
+ /**
+ * @param record
+ * @return
+ */
+ public static TxOp createTxOp(MessageRecord record, Transaction tx) {
+ if (record.getEncoding().equals(TxAck.ENCODING)) {
+ return TxAck.createFromMessageRecord(record, tx);
+ } else {
+ MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
+ return new TxMessage(delivery, tx);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
+import org.apache.activemq.apollo.broker.Transaction.TxOp;
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.RestoreListener;
+import org.apache.activemq.queue.SaveableQueueElement;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.Mapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * TransactionManager
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @version 1.0
+ */
+public class TransactionManager {
+ private static final Log LOG = LogFactory.getLog(TransactionManager.class);
+ private static final String TX_QUEUE_PREFIX = "TX-";
+ private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+
+ private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+ private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+
+ private final VirtualHost host;
+ private final BrokerDatabase database;
+
+ private final AtomicLong tidGen = new AtomicLong(0);
+ private final TransactionStore txStore;
+
+ private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
+ private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
+ // Be default we don't page out elements to disk.
+ //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+ private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+
+ private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
+
+ private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+
+ public boolean isPersistent(TxOp elem) {
+ return elem.isPersistent();
+ }
+
+ public boolean isPageOutPlaceHolders() {
+ return false;
+ }
+
+ public boolean isPagingEnabled() {
+ return PAGING_ENABLED;
+ }
+
+ public int getPagingInMemorySize() {
+ return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+ }
+
+ public boolean isThrottleSourcesToMemoryLimit() {
+ // Keep the queue in memory.
+ return true;
+ }
+
+ public int getDisconnectedThrottleRate() {
+ // By default don't throttle consumers when disconnected.
+ return 0;
+ }
+
+ public int getRecoveryBias() {
+ return 8;
+ }
+ };
+
+ private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
+ public Long map(TxOp element) {
+ return element.getExpiration();
+ }
+ };
+
+ private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
+ public Integer map(TxOp element) {
+ return element.getLimiterSize();
+ }
+ };
+
+ private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
+ public Integer map(TxOp element) {
+ return element.getPriority();
+ }
+ };
+
+ private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
+ public Long map(TxOp element) {
+ return element.getStoreTracking();
+ }
+ };
+
+ private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
+ public Integer map(TxOp element) {
+ return 1;
+ }
+ };
+
+ TransactionManager(VirtualHost host) {
+ this.host = host;
+ txStore = new TransactionStore(host.getDatabase());
+ database = host.getDatabase();
+ }
+
+ /**
+ * @return The TM's virtual host
+ */
+ public final VirtualHost getVirtualHost() {
+ return host;
+ }
+
+ /**
+ * Creates a transaction.
+ *
+ * @param xid
+ * @return
+ */
+ public final Transaction createTransaction(Xid xid) {
+ Transaction ret;
+
+ long tid = tidGen.incrementAndGet();
+ IQueue<Long, TxOp> opQueue = createTranscationQueue(tid);
+
+ if (xid == null) {
+ ret = new LocalTransaction(this, tid, opQueue);
+ } else {
+ ret = new XATransaction(this, tid, xid, opQueue);
+ }
+
+ transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
+ transactions.put(ret.getTid(), ret);
+
+ return ret;
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ public void loadTransactions() throws Exception {
+
+ tidGen.set(database.allocateStoreTracking());
+
+ Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
+
+ // Load shared queues
+ Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+ while (results.hasNext()) {
+ QueueQueryResult loaded = results.next();
+
+ Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
+ if (b == null) {
+ LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
+ database.deleteQueue(loaded.getDescriptor());
+ }
+
+ IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
+ Transaction tx = loadTransaction(b, queue);
+
+ //TODO if we recover a tx that isn't committed then, we should discard it.
+ if (tx.getState() < Transaction.FINISHED_STATE) {
+ LOG.warn("Recovered unfinished transaction: " + tx);
+ }
+ transactions.put(tx.getTid(), tx);
+
+ LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+ }
+
+ if (txns.isEmpty()) {
+ //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
+ //deleted:
+ LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
+
+ }
+ }
+
+ private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException
+ {
+ //TODO move the serialization into the transaction itself:
+ DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
+ byte type = bais.readByte();
+ byte state = bais.readByte();
+ long tid = bais.readLong();
+
+ Transaction tx = null;
+ switch (type) {
+ case Transaction.TYPE_LOCAL:
+ tx = new LocalTransaction(this, tid, queue);
+ break;
+ case Transaction.TYPE_XA:
+ XidImpl xid = new XidImpl();
+ xid.readbody(bais);
+ tx = new XATransaction(this, tid, xid, queue);
+ break;
+ default:
+ throw new IOException("Invalid transaction type: " + type);
+
+ }
+ tx.setState(state);
+ return tx;
+
+ }
+
+ public OperationContext persistTransaction(Transaction tx) throws IOException {
+
+ //TODO move the serialization into the transaction itself:
+ DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+ baos.writeByte(tx.getType());
+ baos.writeByte(tx.getState());
+ baos.writeLong(tx.getTid());
+ if(tx.getType() == Transaction.TYPE_XA)
+ {
+ ((XATransaction)tx).getXid().writebody(baos);
+ }
+
+ return database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+ }
+
+ private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
+
+ IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+ queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+ return queue;
+ }
+
+ private final IQueue<Long, TxOp> createTranscationQueue(long tid) {
+ return createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+ }
+
+ private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
+ ExclusivePersistentQueue<Long, TxOp> queue;
+
+ SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
+ @Override
+ public int getElementSize(TxOp elem) {
+ return elem.getLimiterSize();
+ }
+ };
+ queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
+ queue.setDispatcher(host.getBroker().getDispatcher());
+ queue.setStore(txStore);
+ queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
+ queue.setExpirationMapper(EXPIRATION_MAPPER);
+ queue.getDescriptor().setApplicationType(type);
+ return queue;
+ }
+
+ final QueueStore<Long, Transaction.TxOp> getTxnStore() {
+ return txStore;
+ }
+
+ private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
+ private final BrokerDatabase database;
+
+ private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
+ public MessageRecord marshal(TxOp element) {
+ return element.createMessageRecord();
+ }
+
+ public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
+ Transaction t = transactionsByQueue.get(queue.getQueueName());
+ return Transaction.createTxOp(record, t);
+ }
+ };
+
+ TransactionStore(BrokerDatabase database) {
+ this.database = database;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
+ * .queue.QueueDescriptor)
+ */
+ public void addQueue(QueueDescriptor queue) {
+ database.addQueue(queue);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
+ * .queue.QueueDescriptor)
+ */
+ public void deleteQueue(QueueDescriptor queue) {
+ database.deleteQueue(queue);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
+ * .activemq.queue.QueueDescriptor, java.lang.Object)
+ */
+ public void deleteQueueElement(QueueDescriptor queue, TxOp element) {
+ database.deleteQueueElement(element.getStoreTracking(), queue);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
+ */
+ public boolean isFromStore(TxOp elem) {
+ return elem.isFromStore();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
+ * .activemq.queue.SaveableQueueElement,
+ * org.apache.activemq.flow.ISourceController, boolean)
+ */
+ public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
+ database.saveQeueuElement(sqe, source, delayable, TX_OP_MARSHALLER);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
+ * .activemq.queue.QueueDescriptor, boolean, long, long, int,
+ * org.apache.activemq.queue.RestoreListener)
+ */
+ public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
+ database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Sun Jun 28 19:37:06 2009
@@ -28,8 +28,6 @@
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.AbstractFlowQueue;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.util.IOHelper;
@@ -143,7 +141,7 @@
queue.shutdown(true);
}
- for (AbstractFlowQueue<MessageDelivery> queue : queueStore.getDurableQueues()) {
+ for (IQueue<Long, MessageDelivery> queue : queueStore.getDurableQueues()) {
queue.shutdown(true);
}
@@ -202,7 +200,7 @@
if (consumer.isDurable()) {
DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
if (dsub == null) {
- ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+ IQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
queue.start();
dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
durableSubs.put(consumer.getSubscriptionName(), dsub);
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.util.DataByteArrayInputStream;
+
+/**
+ * XATransaction
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+public class XATransaction extends Transaction {
+
+ private final XidImpl xid;
+
+ XATransaction(TransactionManager manager, long tid, Xid xid, IQueue<Long, TxOp> opQueue) {
+ super(manager, tid, opQueue);
+ if (xid instanceof XidImpl) {
+ this.xid = XidImpl.class.cast(xid);
+ } else {
+ this.xid = new XidImpl(xid);
+ }
+ }
+
+ public XidImpl getXid() {
+ return xid;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+ */
+ @Override
+ public void commit(boolean onePhase) throws XAException, IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#prepare()
+ */
+ @Override
+ public int prepare() throws XAException, IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+ */
+ @Override
+ public void rollback() throws XAException, IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.apollo.broker.Transaction#getType()
+ */
+ @Override
+ public byte getType() {
+ return TYPE_XA;
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * An implementation of JTA transaction idenifier (javax.transaction.xa.Xid).
+ * This is SonicMQ internal Xid. Any external Xid object will be converted to
+ * this class.
+ */
+public class XidImpl implements Xid, Cloneable, java.io.Serializable {
+
+ //fix bug #8334
+ static final long serialVersionUID = -5363901495878210611L;
+
+ // The format identifier for the XID. A value of -1 indicates the NULLXID
+ private int m_formatID = -1; // default format
+
+ private byte m_gtrid[];
+ // The number of bytes in the global transaction identfier
+ private int m_gtridLength; // Value from 1 through MAXGTRIDSIZE
+
+ private byte m_bqual[];
+ // The number of bytes in the branch qualifier
+ private int m_bqualLength; // Value from 1 through MAXBQUALSIZE
+
+ /////////////////////////////// Constructors /////////////////////////////
+ /**
+ * Constructs a new null XID.
+ * <p>
+ * After construction the data within the XID should be initialized.
+ */
+ public XidImpl() {
+ this(-1, null, null);
+ }
+
+ public XidImpl(int formatID, byte[] globalTxnID, byte[] branchID) {
+ m_formatID = formatID;
+ setGlobalTransactionId(globalTxnID);
+ setBranchQualifier(branchID);
+ }
+
+ /**
+ * Initialize an XID using another XID as the source of data.
+ *
+ * @param from
+ * the XID to initialize this XID from
+ */
+ public XidImpl(Xid from) {
+
+ if ((from == null) || (from.getFormatId() == -1)) {
+ m_formatID = -1;
+ setGlobalTransactionId(null);
+ setBranchQualifier(null);
+ } else {
+ m_formatID = from.getFormatId();
+ setGlobalTransactionId(from.getGlobalTransactionId());
+ setBranchQualifier(from.getBranchQualifier());
+ }
+
+ }
+
+ // used for test purpose
+ public XidImpl(String globalTxnId, String branchId) {
+
+ this(99, globalTxnId.getBytes(), branchId.getBytes());
+
+ }
+
+ //////////// Public Methods //////////////
+
+ /**
+ * Determine whether or not two objects of this type are equal.
+ *
+ * @param o
+ * the other XID object to be compared with this XID.
+ *
+ * @return Returns true of the supplied object represents the same global
+ * transaction as this, otherwise returns false.
+ */
+ public boolean equals(Object o) {
+ Xid other;
+
+ if (!(o instanceof Xid))
+ return false;
+
+ other = (Xid) o;
+
+ if (m_formatID == -1 && other.getFormatId() == -1)
+ return true;
+
+ if (m_formatID != other.getFormatId() || m_gtridLength != other.getGlobalTransactionId().length || m_bqualLength != other.getBranchQualifier().length) {
+ return false;
+ }
+
+ return isEqualGtrid(other) && isEqualBranchQualifier(other.getBranchQualifier());
+
+ }
+
+ /**
+ * Compute the hash code.
+ *
+ * @return the computed hashcode
+ */
+
+ public int hashCode() {
+
+ if (m_formatID == -1)
+ return (-1);
+
+ return m_formatID + m_gtridLength - m_bqualLength;
+
+ }
+
+ /**
+ * Return a string representing this XID.
+ * <p>
+ * This is normally used to display the XID when debugging.
+ *
+ * @return the string representation of this XID
+ */
+
+ public String toString() {
+ String gtString = new String(getGlobalTransactionId());
+ String brString = new String(getBranchQualifier());
+ return new String("{Xid: " + "formatID=" + m_formatID + ", " + "gtrid[" + m_gtridLength + "]=" + gtString + ", " + "brid[" + m_bqualLength + "]=" + brString + "}");
+
+ }
+
+ /**
+ * Obtain the format identifier part of the XID.
+ *
+ * @return Format identifier. -1 indicates a null XID
+ */
+ public int getFormatId() {
+ return m_formatID;
+ }
+
+ /**
+ * Returns the global transaction identifier for this XID.
+ *
+ * @return the global transaction identifier
+ */
+ public byte[] getGlobalTransactionId() {
+ return m_gtrid;
+ }
+
+ /**
+ * Returns the branch qualifier for this XID.
+ *
+ * @return the branch qualifier
+ */
+ public byte[] getBranchQualifier() {
+ return m_bqual;
+ }
+
+ ///////////////////////// private methods ////////////////////////////////
+
+ /**
+ * Set the branch qualifier for this XID. Note that the branch qualifier has
+ * a maximum size.
+ *
+ * @param qual
+ * a Byte array containing the branch qualifier to be set. If the
+ * size of the array exceeds MAXBQUALSIZE, only the first
+ * MAXBQUALSIZE elements of qual will be used.
+ */
+ private void setBranchQualifier(byte[] branchID) {
+ if (branchID == null) {
+ m_bqualLength = 0;
+ m_bqual = new byte[m_bqualLength];
+ } else {
+ m_bqualLength = branchID.length > MAXBQUALSIZE ? MAXBQUALSIZE : branchID.length;
+ m_bqual = new byte[m_bqualLength];
+ System.arraycopy(branchID, 0, m_bqual, 0, m_bqualLength);
+ }
+ }
+
+ private void setGlobalTransactionId(byte[] globalTxnID) {
+ if (globalTxnID == null) {
+ m_gtridLength = 0;
+ m_gtrid = new byte[m_gtridLength];
+ } else {
+ m_gtridLength = globalTxnID.length > MAXGTRIDSIZE ? MAXGTRIDSIZE : globalTxnID.length;
+ m_gtrid = new byte[m_gtridLength];
+ System.arraycopy(globalTxnID, 0, m_gtrid, 0, m_gtridLength);
+ }
+ }
+
+ /**
+ * Return whether the Gtrid of this is equal to the Gtrid of xid
+ */
+ private boolean isEqualGtrid(Xid xid) {
+ byte[] xidGtrid = xid.getGlobalTransactionId();
+
+ if (getGlobalTransactionId() == null && xidGtrid == null)
+ return true;
+ if (getGlobalTransactionId() == null)
+ return false;
+ if (xidGtrid == null)
+ return false;
+
+ if (m_gtridLength != xidGtrid.length) {
+ return false;
+ }
+
+ for (int i = 0; i < m_gtridLength; i++) {
+ if (m_gtrid[i] != xidGtrid[i])
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Determine if an array of bytes equals the branch qualifier
+ *
+ * @return true if equal
+ */
+ private boolean isEqualBranchQualifier(byte[] data) {
+
+ int L = data.length > MAXBQUALSIZE ? MAXBQUALSIZE : data.length;
+
+ if (L != m_bqualLength)
+ return false;
+
+ for (int i = 0; i < m_bqualLength; i++) {
+ if (data[i] != m_bqual[i])
+ return false;
+ }
+
+ return true;
+ }
+
+ public int getMemorySize() {
+ return 4 // formatId
+ + 4 // length of globalTxnId
+ + m_gtridLength // globalTxnId
+ + 4 // length of branchId
+ + m_bqualLength; // branchId
+ }
+
+ /**
+ * Writes this XidImpl's data to the DataOutput destination
+ *
+ * @param out
+ * The DataOutput destination
+ * @param maxbytes
+ * Maximum number of bytes that may be written to the destination
+ *
+ * @exception ELogEventTooLong
+ * The data could not be written without exceeding the
+ * maxbytes parameter. The data may have been partially
+ * written.
+ */
+ public void writebody(DataOutput out) throws IOException {
+ out.writeInt(m_formatID); // format ID
+
+ out.writeInt(m_gtridLength); // length of global Txn ID
+ out.write(getGlobalTransactionId(), 0, m_gtridLength); // global transaction ID
+ out.writeInt(m_bqualLength); // length of branch ID
+ out.write(getBranchQualifier(), 0, m_bqualLength); // branch ID
+ }
+
+ /**
+ * read xid from an Array and set each fields.
+ *
+ * @param in
+ * the data input array
+ * @throws IOException
+ */
+ public void readbody(DataInput in) throws IOException {
+ m_formatID = in.readInt();
+ int gtidLen = in.readInt();
+ byte[] globalTxnId = new byte[gtidLen];
+ in.readFully(globalTxnId, 0, gtidLen);
+
+ int brlen = in.readInt();
+ byte[] branchId = new byte[brlen];
+ in.readFully(branchId, 0, brlen);
+
+ setGlobalTransactionId(globalTxnId);
+ setBranchQualifier(branchId);
+ }
+} // class XidImpl
Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
------------------------------------------------------------------------------
svn:eol-style = native