You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:39:04 UTC
svn commit: r961067 [4/5] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Wed Jul 7 03:39:03 2010
@@ -17,120 +17,137 @@
package org.apache.activemq.apollo.broker;
import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
+import org.fusesource.hawtdispatch.internal.util.RunnableCountDownLatch;
public class Queue implements DeliveryTarget {
-
private Destination destination;
- private final IQueue<Long, MessageDelivery> queue;
- private VirtualHost virtualHost;
-
- Queue(IQueue<Long, MessageDelivery> queue) {
- this.queue = queue;
- }
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
- * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
- */
- public void deliver(MessageDelivery message, ISourceController<?> source) {
- queue.add(message, source);
- }
-
- public final void addSubscription(final Subscription<MessageDelivery> sub) {
- queue.addSubscription(sub);
- }
+// TODO:
+// private Destination destination;
+// private VirtualHost virtualHost;
+//
+// Queue() {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+// */
+// public void deliver(MessageDelivery message, ISourceController<?> source) {
+// queue.add(message, source);
+// }
+//
+// public final void addSubscription(final Subscription<MessageDelivery> sub) {
+// queue.addSubscription(sub);
+// }
+//
+// public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+// return queue.removeSubscription(sub);
+// }
+//
+// public void start() throws Exception {
+// queue.start();
+// }
+//
+// public void stop() throws Exception {
+// if (queue != null) {
+// queue.stop();
+// }
+// }
+//
+// public void shutdown(Runnable onShutdown) throws Exception {
+// if (queue != null) {
+// queue.shutdown(onShutdown);
+// }
+// }
+//
+// public boolean hasSelector() {
+// return false;
+// }
+//
+// public boolean matches(MessageDelivery message) {
+// return true;
+// }
+//
+// public VirtualHost getBroker() {
+// return virtualHost;
+// }
+//
+// public void setVirtualHost(VirtualHost virtualHost) {
+// this.virtualHost = virtualHost;
+// }
+//
+// public void setDestination(Destination destination) {
+// this.destination = destination;
+// }
+//
+// public final Destination getDestination() {
+// return destination;
+// }
+//
+// public boolean isDurable() {
+// return true;
+// }
+//
+// public static class QueueSubscription implements BrokerSubscription {
+// Subscription<MessageDelivery> subscription;
+// final Queue queue;
+//
+// public QueueSubscription(Queue queue) {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+// * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+// this.subscription = subscription;
+// queue.addSubscription(subscription);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void disconnect(ConsumerContext context) {
+// queue.removeSubscription(subscription);
+// }
+//
+// /* (non-Javadoc)
+// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+// */
+// public Destination getDestination() {
+// return queue.getDestination();
+// }
+// }
- public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
- return queue.removeSubscription(sub);
- }
-
- public void start() throws Exception {
- queue.start();
- }
-
- public void stop() throws Exception {
- if (queue != null) {
- queue.stop();
- }
- }
-
- public void shutdown(Runnable onShutdown) throws Exception {
- if (queue != null) {
- queue.shutdown(onShutdown);
- }
+ public void deliver(MessageDelivery message) {
+ //To change body of implemented methods use File | Settings | File Templates.
}
public boolean hasSelector() {
- return false;
+ return false; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean matches(MessageDelivery message) {
- return true;
- }
-
- public VirtualHost getBroker() {
- return virtualHost;
- }
-
- public void setVirtualHost(VirtualHost virtualHost) {
- this.virtualHost = virtualHost;
+ return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void setDestination(Destination destination) {
- this.destination = destination;
+ public void shutdown(RunnableCountDownLatch done) {
}
- public final Destination getDestination() {
+ public Destination getDestination() {
return destination;
}
-
- public boolean isDurable() {
- return true;
- }
-
- public static class QueueSubscription implements BrokerSubscription {
- Subscription<MessageDelivery> subscription;
- final Queue queue;
-
- public QueueSubscription(Queue queue) {
- this.queue = queue;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
- * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
- this.subscription = subscription;
- queue.addSubscription(subscription);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
- * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
- */
- public void disconnect(ConsumerContext context) {
- queue.removeSubscription(subscription);
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
- */
- public Destination getDestination() {
- return queue.getDestination();
- }
- }
-
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Wed Jul 7 03:39:03 2010
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,43 +86,43 @@ final public class Router {
}
}
- public void route(final MessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
-
- //If the message is part of transaction send it to the transaction manager
- if(msg.getTransactionId() >= 0)
- {
- virtualHost.getTransactionManager().newMessage(msg, controller);
- return;
- }
-
- Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
-
- //Set up the delivery for persistence:
- msg.beginDispatch(database);
-
- try {
- // TODO:
- // Consider doing some caching of this sub list. Most producers
- // always send to the same destination.
- if (targets != null) {
- // The sinks will request persistence via MessageDelivery.persist()
- // if they require persistence:
- for (DeliveryTarget target : targets) {
- target.deliver(msg, controller);
- }
- }
- } finally {
- try {
- msg.finishDispatch(controller);
- } catch (IOException ioe) {
- //TODO: Error serializing the message, this should trigger an error
- //This is a pretty severe error as we've already delivered
- //the message to the recipients. If we send an error response
- //back it could result in a duplicate. Does this mean that we
- //should persist the message prior to sending to the recips?
- ioe.printStackTrace();
- }
- }
+ public void route(final MessageDelivery msg, boolean autoCreate) {
+// TODO:
+// //If the message is part of transaction send it to the transaction manager
+// if(msg.getTransactionId() >= 0)
+// {
+// virtualHost.getTransactionManager().newMessage(msg, controller);
+// return;
+// }
+//
+// Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
+//
+// //Set up the delivery for persistence:
+// msg.beginDispatch(database);
+//
+// try {
+// // TODO:
+// // Consider doing some caching of this sub list. Most producers
+// // always send to the same destination.
+// if (targets != null) {
+// // The sinks will request persistence via MessageDelivery.persist()
+// // if they require persistence:
+// for (DeliveryTarget target : targets) {
+// target.deliver(msg, controller);
+// }
+// }
+// } finally {
+// try {
+// msg.finishDispatch(controller);
+// } catch (IOException ioe) {
+// //TODO: Error serializing the message, this should trigger an error
+// //This is a pretty severe error as we've already delivered
+// //the message to the recipients. If we send an error response
+// //back it could result in a duplicate. Does this mean that we
+// //should persist the message prior to sending to the recips?
+// ioe.printStackTrace();
+// }
+// }
}
private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg, boolean autoCreate) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul 7 03:39:03 2010
@@ -20,140 +20,155 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IFlowQueue;
-import org.apache.activemq.queue.QueueDispatchTarget;
-import org.apache.activemq.queue.Subscription;
import org.apache.activemq.util.IntrospectionSupport;
class TopicSubscription implements BrokerSubscription, DeliveryTarget {
- static final boolean USE_PERSISTENT_QUEUES = true;
-
- protected final BooleanExpression selector;
- protected final Destination destination;
- protected Subscription<MessageDelivery> connectedSub;
- private final VirtualHost host;
-
- //TODO: replace this with a base interface for queue which also support non persistent use case.
- private IFlowQueue<MessageDelivery> queue;
-
- TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
- this.host = host;
- this.selector = selector;
- this.destination = destination;
- }
-
- @Override
- public String toString() {
- return IntrospectionSupport.toString(this);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
- * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
- */
- public final void deliver(MessageDelivery message, ISourceController<?> source) {
- if (matches(message)) {
- queue.add(message, source);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
- */
- public boolean hasSelector() {
- return selector != null;
+// static final boolean USE_PERSISTENT_QUEUES = true;
+//
+// protected final BooleanExpression selector;
+// protected final Destination destination;
+// protected Subscription<MessageDelivery> connectedSub;
+// private final VirtualHost host;
+//
+// //TODO: replace this with a base interface for queue which also support non persistent use case.
+// private IFlowQueue<MessageDelivery> queue;
+//
+// TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+// this.host = host;
+// this.selector = selector;
+// this.destination = destination;
+// }
+//
+// @Override
+// public String toString() {
+// return IntrospectionSupport.toString(this);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+// */
+// public final void deliver(MessageDelivery message, ISourceController<?> source) {
+// if (matches(message)) {
+// queue.add(message, source);
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+// */
+// public boolean hasSelector() {
+// return selector != null;
+// }
+//
+// public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
+// if (this.connectedSub == null) {
+// if( subscription.isPersistent() ) {
+// queue = createPersistentQueue(subscription);
+// } else {
+// queue = createNonPersistentQueue(subscription);
+// }
+// queue.start();
+//
+// this.connectedSub = subscription;
+// this.queue.addSubscription(connectedSub);
+// this.host.getRouter().bind(destination, this);
+// } else if (connectedSub != subscription) {
+// throw new UserAlreadyConnectedException();
+// }
+// }
+//
+// private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
+// Flow flow = new Flow(subscription.getResourceName(), false);
+// String name = subscription.getResourceName();
+// IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+// ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+// queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
+// public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+// subscription.add(elem, controller);
+// }
+// });
+// return queue;
+// }
+//
+// private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+// ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+// return queue;
+// }
+//
+// @SuppressWarnings("unchecked")
+// private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+// ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+// host.getQueueStore().deleteQueue(pq.getDescriptor());
+// }
+//
+// public synchronized void disconnect(final ConsumerContext subscription) {
+// if (connectedSub != null && connectedSub == subscription) {
+// this.host.getRouter().unbind(destination, this);
+// this.queue.removeSubscription(connectedSub);
+// this.connectedSub = null;
+//
+// queue.stop();
+// if( USE_PERSISTENT_QUEUES ) {
+// destroyPersistentQueue(queue);
+// }
+// queue=null;
+// }
+// }
+//
+//
+//
+// public boolean matches(MessageDelivery message) {
+// if (selector == null) {
+// return true;
+// }
+//
+// MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+// selectorContext.setDestination(destination);
+// try {
+// return (selector.matches(selectorContext));
+// } catch (FilterException e) {
+// e.printStackTrace();
+// return false;
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+// */
+// public Destination getDestination() {
+// return destination;
+// }
+
+ public void connect(ConsumerContext subscription) throws Exception {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void disconnect(ConsumerContext subscription) {
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
- if (this.connectedSub == null) {
- if( subscription.isPersistent() ) {
- queue = createPersistentQueue(subscription);
- } else {
- queue = createNonPersistentQueue(subscription);
- }
- queue.start();
-
- this.connectedSub = subscription;
- this.queue.addSubscription(connectedSub);
- this.host.getRouter().bind(destination, this);
- } else if (connectedSub != subscription) {
- throw new UserAlreadyConnectedException();
- }
- }
-
- private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
- Flow flow = new Flow(subscription.getResourceName(), false);
- String name = subscription.getResourceName();
- IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
- ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
- queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
- public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
- subscription.add(elem, controller);
- }
- });
- return queue;
- }
-
- private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
- ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
- return queue;
- }
-
- @SuppressWarnings("unchecked")
- private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
- ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
- host.getQueueStore().deleteQueue(pq.getDescriptor());
- }
-
- public synchronized void disconnect(final ConsumerContext subscription) {
- if (connectedSub != null && connectedSub == subscription) {
- this.host.getRouter().unbind(destination, this);
- this.queue.removeSubscription(connectedSub);
- this.connectedSub = null;
-
- queue.stop();
- if( USE_PERSISTENT_QUEUES ) {
- destroyPersistentQueue(queue);
- }
- queue=null;
- }
- }
-
-
-
- public boolean matches(MessageDelivery message) {
- if (selector == null) {
- return true;
- }
-
- MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
- selectorContext.setDestination(destination);
- try {
- return (selector.matches(selectorContext));
- } catch (FilterException e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
- */
public Destination getDestination() {
- return destination;
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void deliver(MessageDelivery message) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean hasSelector() {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean matches(MessageDelivery message) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Wed Jul 7 03:39:03 2010
@@ -22,10 +22,6 @@ import java.util.HashSet;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
import org.apache.activemq.util.FutureListener;
import org.apache.activemq.util.ListenableFuture;
import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -41,526 +37,527 @@ import org.apache.commons.logging.LogFac
*/
public abstract class Transaction {
- private static final Log LOG = LogFactory.getLog(Transaction.class);
-
- public static final byte START_STATE = 0; // can go to: 1,2,3
- public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
- public static final byte PREPARED_STATE = 2; // can go to: 3, 4
- public static final byte COMMITED_STATE = 3;
- public static final byte ROLLBACK_STATE = 4;
-
- static final byte TYPE_LOCAL = 0;
- static final byte TYPE_XA = 1;
-
- private byte state = START_STATE;
- private final TransactionManager manager;
- private final long tid;
- private final IQueue<Long, TxOp> opQueue;
- protected HashSet<TransactionListener> listeners;
-
- private TxProcessor processor;
-
- Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
- this.manager = manager;
- this.opQueue = opQueue;
- opQueue.start();
- this.tid = tid;
- }
-
- /**
- * @return the unique identifier used by the {@link TransactionManager} to
- * identify this {@link Transaction}
- *
- */
- public long getTid() {
- return tid;
- }
-
- public AsciiBuffer getBackingQueueName() {
- return opQueue.getDescriptor().getQueueName();
- }
-
- /**
- * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
- */
- public abstract byte getType();
-
- public void addMessage(MessageDelivery m, ISourceController<?> source) {
-
- synchronized (this) {
- switch (state) {
- case START_STATE:
- case IN_USE_STATE:
- opQueue.add(new TxMessage(m, this), source);
- break;
- default: {
- throw new IllegalStateException("Can't add message to finished or prepared transaction");
- }
- }
- }
- }
-
- public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
- synchronized (this) {
- switch (state) {
- case START_STATE:
- case IN_USE_STATE:
- IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
- //Queue could be null if it was just deleted:
- if (target != null) {
- long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
- opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
- }
- break;
- default: {
- throw new IllegalStateException("Can't add message to finished or prepared transaction");
- }
- }
- }
- }
-
- public byte getState() {
- return state;
- }
-
- public void setState(byte state, FutureListener<? super Object> listener) {
- this.state = state;
- ListenableFuture<?> future = manager.persistTransaction(this);
- future.setFutureListener(listener);
- }
-
- public void prePrepare() throws Exception {
-
- // Is it ok to call prepare now given the state of the
- // transaction?
- switch (state) {
- case START_STATE:
- case IN_USE_STATE:
- break;
- default:
- XAException xae = new XAException("Prepare cannot be called now.");
- xae.errorCode = XAException.XAER_PROTO;
- throw xae;
- }
- }
-
- protected void fireAfterCommit() throws Exception {
-
- synchronized (this) {
- for (TransactionListener listener : listeners) {
- listener.onCommit(this);
- }
- }
- }
-
- public void fireAfterRollback() throws Exception {
- synchronized (this) {
- for (TransactionListener listener : listeners) {
- listener.onRollback(this);
- }
- }
- }
-
- public void fireAfterPrepare() throws Exception {
- synchronized (this) {
- for (TransactionListener listener : listeners) {
- listener.onPrepared(this);
- }
- }
- }
-
- public String toString() {
- return super.toString() + "[queue=" + opQueue + "]";
- }
-
- public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
-
- public abstract void rollback(TransactionListener listener) throws XAException, IOException;
-
- public abstract int prepare(TransactionListener listener) throws XAException, IOException;
-
- public boolean isPrepared() {
- return getState() == PREPARED_STATE;
- }
-
- public long size() {
- return opQueue.getEnqueuedCount();
- }
-
- public static abstract class TransactionListener {
- public void onRollback(Transaction t) {
-
- }
-
- public void onCommit(Transaction t) {
-
- }
-
- public void onPrepared(Transaction t) {
-
- }
- }
-
- interface TxOp {
- public static final short TYPE_MESSAGE = 0;
- public static final short TYPE_ACK = 1;
-
- public short getType();
-
- public <T> T asType(Class<T> type);
-
- public void onRollback(ISourceController<?> controller);
-
- public void onCommit(ISourceController<?> controller);
-
- public int getLimiterSize();
-
- public boolean isFromStore();
-
- public long getStoreTracking();
-
- public MessageRecord createMessageRecord();
-
- /**
- * @return
- */
- public boolean isPersistent();
-
- /**
- * @return
- */
- public Long getExpiration();
-
- public int getPriority();
- }
-
- static class TxMessage implements TxOp {
- MessageDelivery message;
- Transaction tx;
- private boolean fromStore;
-
- /**
- * @param m
- * @param transaction
- */
- public TxMessage(MessageDelivery m, Transaction tx) {
- message = m;
- this.tx = tx;
- }
-
- public <T> T asType(Class<T> type) {
- if (type == TxMessage.class) {
- return type.cast(this);
- } else {
- return null;
- }
- }
-
- public final short getType() {
- return TYPE_MESSAGE;
- }
-
- public final int getLimiterSize() {
- return message.getFlowLimiterSize();
- }
-
- public final void onCommit(ISourceController<?> controller) {
- message.clearTransactionId();
- tx.manager.getVirtualHost().getRouter().route(message, controller, true);
- }
-
- public final void onRollback(ISourceController<?> controller) {
- //Nothing to do here, message just gets dropped:
- return;
- }
-
- public final boolean isFromStore() {
- return fromStore;
- }
-
- public final MessageRecord createMessageRecord() {
- return message.createMessageRecord();
- }
-
- public final long getStoreTracking() {
- return message.getStoreTracking();
- }
-
- public final boolean isPersistent() {
- return message.isPersistent();
- }
-
- public final Long getExpiration() {
- return message.getExpiration();
- }
-
- public final int getPriority() {
- return message.getPriority();
- }
- }
-
- static class TxAck implements TxOp {
- public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
- Transaction tx;
- IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
- long queueSequence; //Sequence number of the element on the queue from which to delete.
- final long storeTracking; //Store tracking of this delete op.
- private boolean fromStore;
- private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
-
- TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
- this.queue = queue;
- this.queueSequence = removalKey;
- this.tx = tx;
- this.storeTracking = storeTracking;
- }
-
- public final short getType() {
- return TYPE_ACK;
- }
-
- public <T> T asType(Class<T> type) {
- if (type == TxAck.class) {
- return type.cast(this);
- } else {
- return null;
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
- */
- public final void onCommit(ISourceController<?> controller) {
- queue.remove(queueSequence);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
- */
- public final void onRollback(ISourceController<?> controller) {
- //No-Op for now, it is possible that we'd want to unaquire these
- //in the queue if the client weren't to keep these
- //around
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
- */
- public final int getLimiterSize() {
- return MEM_SIZE;
- }
-
- public final boolean isFromStore() {
- return fromStore;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
- */
- public final long getStoreTracking() {
- return storeTracking;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
- * ()
- */
- public final MessageRecord createMessageRecord() {
- MessageRecord ret = new MessageRecord();
- ret.setEncoding(TxAck.ENCODING);
- ret.setKey(storeTracking);
- ret.setSize(MEM_SIZE);
- ret.setBuffer(new Buffer(toBytes().getData()));
- return null;
- }
-
- private final Buffer toBytes() {
- AsciiBuffer queueName = queue.getDescriptor().getQueueName();
- DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
- try {
- baos.writeShort(queueName.length);
- baos.write(queueName.data, queueName.offset, queueName.length);
- baos.writeLong(queueSequence);
- } catch (IOException shouldNotHappen) {
- throw new RuntimeException(shouldNotHappen);
- }
- return baos.toBuffer();
- }
-
- private final void fromBytes(byte[] bytes) {
- DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
- byte[] queueBytes = new byte[baos.readShort()];
- baos.readFully(queueBytes);
- AsciiBuffer queueName = new AsciiBuffer(queueBytes);
- queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
- queueSequence = baos.readLong();
-
- }
-
- public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
- TxAck ret = new TxAck(null, -1, record.getKey(), tx);
- ret.fromBytes(record.getBuffer().getData());
- return ret;
- }
-
- public final boolean isPersistent() {
- //TODO This could probably be relaxed when the ack is for non persistent
- //elements
- return true;
- }
-
- public final Long getExpiration() {
- return -1L;
- }
-
- public final int getPriority() {
- return 0;
- }
- }
-
- /**
- * @param record
- * @return
- */
- public static TxOp createTxOp(MessageRecord record, Transaction tx) {
- if (record.getEncoding().equals(TxAck.ENCODING)) {
- return TxAck.createFromMessageRecord(record, tx);
- } else {
- MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
- return new TxMessage(delivery, tx);
- }
- }
-
- protected void startTransactionProcessor()
- {
- synchronized(this)
- {
- if(processor == null)
- {
- processor = new TxProcessor();
- opQueue.addSubscription(processor);
- }
- }
- }
-
-
- /**
- * TxProcessor
- * <p>
- * Description: The tx processor processes the transaction queue after
- * commit or rollback.
- * </p>
- *
- * @author cmacnaug
- * @version 1.0
- */
- private class TxProcessor implements Subscription<TxOp> {
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
- * org.apache.activemq.flow.ISourceController,
- * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
- */
- public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
-
- switch (state) {
- case COMMITED_STATE: {
- element.onCommit(controller);
- if (callback != null) {
- callback.acknowledge();
- }
- break;
- }
- case ROLLBACK_STATE: {
- element.onRollback(controller);
- if (callback != null) {
- callback.acknowledge();
- }
- break;
- }
- default: {
- LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
- }
- }
-
- //If we've reached the end of the op queue
- if (opQueue.getEnqueuedCount() == 0) {
- opQueue.shutdown(null);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#hasSelector()
- */
- public boolean hasSelector() {
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#isBrowser()
- */
- public boolean isBrowser() {
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#isExclusive()
- */
- public boolean isExclusive() {
- return true;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
- * .Object)
- */
- public boolean isRemoveOnDispatch(TxOp elem) {
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
- */
- public boolean matches(TxOp elem) {
- return true;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
- * org.apache.activemq.flow.ISourceController,
- * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
- */
- public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
- add(element, controller, callback);
- return true;
- }
-
- }
+// TODO:
+// private static final Log LOG = LogFactory.getLog(Transaction.class);
+//
+// public static final byte START_STATE = 0; // can go to: 1,2,3
+// public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
+// public static final byte PREPARED_STATE = 2; // can go to: 3, 4
+// public static final byte COMMITED_STATE = 3;
+// public static final byte ROLLBACK_STATE = 4;
+//
+// static final byte TYPE_LOCAL = 0;
+// static final byte TYPE_XA = 1;
+//
+// private byte state = START_STATE;
+// private final TransactionManager manager;
+// private final long tid;
+// private final IQueue<Long, TxOp> opQueue;
+// protected HashSet<TransactionListener> listeners;
+//
+// private TxProcessor processor;
+//
+// Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+// this.manager = manager;
+// this.opQueue = opQueue;
+// opQueue.start();
+// this.tid = tid;
+// }
+//
+// /**
+// * @return the unique identifier used by the {@link TransactionManager} to
+// * identify this {@link Transaction}
+// *
+// */
+// public long getTid() {
+// return tid;
+// }
+//
+// public AsciiBuffer getBackingQueueName() {
+// return opQueue.getDescriptor().getQueueName();
+// }
+//
+// /**
+// * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
+// */
+// public abstract byte getType();
+//
+// public void addMessage(MessageDelivery m, ISourceController<?> source) {
+//
+// synchronized (this) {
+// switch (state) {
+// case START_STATE:
+// case IN_USE_STATE:
+// opQueue.add(new TxMessage(m, this), source);
+// break;
+// default: {
+// throw new IllegalStateException("Can't add message to finished or prepared transaction");
+// }
+// }
+// }
+// }
+//
+// public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
+// synchronized (this) {
+// switch (state) {
+// case START_STATE:
+// case IN_USE_STATE:
+// IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
+// //Queue could be null if it was just deleted:
+// if (target != null) {
+// long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
+// opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
+// }
+// break;
+// default: {
+// throw new IllegalStateException("Can't add message to finished or prepared transaction");
+// }
+// }
+// }
+// }
+//
+// public byte getState() {
+// return state;
+// }
+//
+// public void setState(byte state, FutureListener<? super Object> listener) {
+// this.state = state;
+// ListenableFuture<?> future = manager.persistTransaction(this);
+// future.setFutureListener(listener);
+// }
+//
+// public void prePrepare() throws Exception {
+//
+// // Is it ok to call prepare now given the state of the
+// // transaction?
+// switch (state) {
+// case START_STATE:
+// case IN_USE_STATE:
+// break;
+// default:
+// XAException xae = new XAException("Prepare cannot be called now.");
+// xae.errorCode = XAException.XAER_PROTO;
+// throw xae;
+// }
+// }
+//
+// protected void fireAfterCommit() throws Exception {
+//
+// synchronized (this) {
+// for (TransactionListener listener : listeners) {
+// listener.onCommit(this);
+// }
+// }
+// }
+//
+// public void fireAfterRollback() throws Exception {
+// synchronized (this) {
+// for (TransactionListener listener : listeners) {
+// listener.onRollback(this);
+// }
+// }
+// }
+//
+// public void fireAfterPrepare() throws Exception {
+// synchronized (this) {
+// for (TransactionListener listener : listeners) {
+// listener.onPrepared(this);
+// }
+// }
+// }
+//
+// public String toString() {
+// return super.toString() + "[queue=" + opQueue + "]";
+// }
+//
+// public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
+//
+// public abstract void rollback(TransactionListener listener) throws XAException, IOException;
+//
+// public abstract int prepare(TransactionListener listener) throws XAException, IOException;
+//
+// public boolean isPrepared() {
+// return getState() == PREPARED_STATE;
+// }
+//
+// public long size() {
+// return opQueue.getEnqueuedCount();
+// }
+//
+// public static abstract class TransactionListener {
+// public void onRollback(Transaction t) {
+//
+// }
+//
+// public void onCommit(Transaction t) {
+//
+// }
+//
+// public void onPrepared(Transaction t) {
+//
+// }
+// }
+//
+// interface TxOp {
+// public static final short TYPE_MESSAGE = 0;
+// public static final short TYPE_ACK = 1;
+//
+// public short getType();
+//
+// public <T> T asType(Class<T> type);
+//
+// public void onRollback(ISourceController<?> controller);
+//
+// public void onCommit(ISourceController<?> controller);
+//
+// public int getLimiterSize();
+//
+// public boolean isFromStore();
+//
+// public long getStoreTracking();
+//
+// public MessageRecord createMessageRecord();
+//
+// /**
+// * @return
+// */
+// public boolean isPersistent();
+//
+// /**
+// * @return
+// */
+// public Long getExpiration();
+//
+// public int getPriority();
+// }
+//
+// static class TxMessage implements TxOp {
+// MessageDelivery message;
+// Transaction tx;
+// private boolean fromStore;
+//
+// /**
+// * @param m
+// * @param transaction
+// */
+// public TxMessage(MessageDelivery m, Transaction tx) {
+// message = m;
+// this.tx = tx;
+// }
+//
+// public <T> T asType(Class<T> type) {
+// if (type == TxMessage.class) {
+// return type.cast(this);
+// } else {
+// return null;
+// }
+// }
+//
+// public final short getType() {
+// return TYPE_MESSAGE;
+// }
+//
+// public final int getLimiterSize() {
+// return message.getFlowLimiterSize();
+// }
+//
+// public final void onCommit(ISourceController<?> controller) {
+// message.clearTransactionId();
+// tx.manager.getVirtualHost().getRouter().route(message, controller, true);
+// }
+//
+// public final void onRollback(ISourceController<?> controller) {
+// //Nothing to do here, message just gets dropped:
+// return;
+// }
+//
+// public final boolean isFromStore() {
+// return fromStore;
+// }
+//
+// public final MessageRecord createMessageRecord() {
+// return message.createMessageRecord();
+// }
+//
+// public final long getStoreTracking() {
+// return message.getStoreTracking();
+// }
+//
+// public final boolean isPersistent() {
+// return message.isPersistent();
+// }
+//
+// public final Long getExpiration() {
+// return message.getExpiration();
+// }
+//
+// public final int getPriority() {
+// return message.getPriority();
+// }
+// }
+//
+// static class TxAck implements TxOp {
+// public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
+// Transaction tx;
+// IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
+// long queueSequence; //Sequence number of the element on the queue from which to delete.
+// final long storeTracking; //Store tracking of this delete op.
+// private boolean fromStore;
+// private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
+//
+// TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
+// this.queue = queue;
+// this.queueSequence = removalKey;
+// this.tx = tx;
+// this.storeTracking = storeTracking;
+// }
+//
+// public final short getType() {
+// return TYPE_ACK;
+// }
+//
+// public <T> T asType(Class<T> type) {
+// if (type == TxAck.class) {
+// return type.cast(this);
+// } else {
+// return null;
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
+// */
+// public final void onCommit(ISourceController<?> controller) {
+// queue.remove(queueSequence);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+// */
+// public final void onRollback(ISourceController<?> controller) {
+// //No-Op for now, it is possible that we'd want to unaquire these
+// //in the queue if the client weren't to keep these
+// //around
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+// */
+// public final int getLimiterSize() {
+// return MEM_SIZE;
+// }
+//
+// public final boolean isFromStore() {
+// return fromStore;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
+// */
+// public final long getStoreTracking() {
+// return storeTracking;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
+// * ()
+// */
+// public final MessageRecord createMessageRecord() {
+// MessageRecord ret = new MessageRecord();
+// ret.setEncoding(TxAck.ENCODING);
+// ret.setKey(storeTracking);
+// ret.setSize(MEM_SIZE);
+// ret.setBuffer(new Buffer(toBytes().getData()));
+// return null;
+// }
+//
+// private final Buffer toBytes() {
+// AsciiBuffer queueName = queue.getDescriptor().getQueueName();
+// DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
+// try {
+// baos.writeShort(queueName.length);
+// baos.write(queueName.data, queueName.offset, queueName.length);
+// baos.writeLong(queueSequence);
+// } catch (IOException shouldNotHappen) {
+// throw new RuntimeException(shouldNotHappen);
+// }
+// return baos.toBuffer();
+// }
+//
+// private final void fromBytes(byte[] bytes) {
+// DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
+// byte[] queueBytes = new byte[baos.readShort()];
+// baos.readFully(queueBytes);
+// AsciiBuffer queueName = new AsciiBuffer(queueBytes);
+// queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+// queueSequence = baos.readLong();
+//
+// }
+//
+// public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
+// TxAck ret = new TxAck(null, -1, record.getKey(), tx);
+// ret.fromBytes(record.getBuffer().getData());
+// return ret;
+// }
+//
+// public final boolean isPersistent() {
+// //TODO This could probably be relaxed when the ack is for non persistent
+// //elements
+// return true;
+// }
+//
+// public final Long getExpiration() {
+// return -1L;
+// }
+//
+// public final int getPriority() {
+// return 0;
+// }
+// }
+//
+// /**
+// * @param record
+// * @return
+// */
+// public static TxOp createTxOp(MessageRecord record, Transaction tx) {
+// if (record.getEncoding().equals(TxAck.ENCODING)) {
+// return TxAck.createFromMessageRecord(record, tx);
+// } else {
+// MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
+// return new TxMessage(delivery, tx);
+// }
+// }
+//
+// protected void startTransactionProcessor()
+// {
+// synchronized(this)
+// {
+// if(processor == null)
+// {
+// processor = new TxProcessor();
+// opQueue.addSubscription(processor);
+// }
+// }
+// }
+//
+//
+// /**
+// * TxProcessor
+// * <p>
+// * Description: The tx processor processes the transaction queue after
+// * commit or rollback.
+// * </p>
+// *
+// * @author cmacnaug
+// * @version 1.0
+// */
+// private class TxProcessor implements Subscription<TxOp> {
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
+// * org.apache.activemq.flow.ISourceController,
+// * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+// */
+// public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+//
+// switch (state) {
+// case COMMITED_STATE: {
+// element.onCommit(controller);
+// if (callback != null) {
+// callback.acknowledge();
+// }
+// break;
+// }
+// case ROLLBACK_STATE: {
+// element.onRollback(controller);
+// if (callback != null) {
+// callback.acknowledge();
+// }
+// break;
+// }
+// default: {
+// LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
+// }
+// }
+//
+// //If we've reached the end of the op queue
+// if (opQueue.getEnqueuedCount() == 0) {
+// opQueue.shutdown(null);
+// }
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#hasSelector()
+// */
+// public boolean hasSelector() {
+// return false;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#isBrowser()
+// */
+// public boolean isBrowser() {
+// return false;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#isExclusive()
+// */
+// public boolean isExclusive() {
+// return true;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+// * .Object)
+// */
+// public boolean isRemoveOnDispatch(TxOp elem) {
+// return false;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
+// */
+// public boolean matches(TxOp elem) {
+// return true;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
+// * org.apache.activemq.flow.ISourceController,
+// * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+// */
+// public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+// add(element, controller, callback);
+// return true;
+// }
+//
+// }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul 7 03:39:03 2010
@@ -22,19 +22,9 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
-import org.apache.activemq.apollo.broker.Transaction.TxOp;
import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.RestoreListener;
-import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.ListenableFuture;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -53,362 +43,368 @@ import org.apache.commons.logging.LogFac
* @version 1.0
*/
public class TransactionManager {
- private static final Log LOG = LogFactory.getLog(TransactionManager.class);
- private static final String TX_QUEUE_PREFIX = "TX-";
- private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+// TODO:
+// private static final Log LOG = LogFactory.getLog(TransactionManager.class);
+// private static final String TX_QUEUE_PREFIX = "TX-";
+// private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+//
+// private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+// private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+// private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
+//
+// private final VirtualHost host;
+// private BrokerDatabase database;
+//
+// private final AtomicLong tidGen = new AtomicLong(0);
+// private final TransactionStore txStore;
+//
+// private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
+// private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
+// // Be default we don't page out elements to disk.
+// private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+// //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+//
+// private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
+//
+// private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//
+// public boolean isPersistent(TxOp elem) {
+// return elem.isPersistent();
+// }
+//
+// public boolean isPageOutPlaceHolders() {
+// return false;
+// }
+//
+// public boolean isPagingEnabled() {
+// return PAGING_ENABLED;
+// }
+//
+// public int getPagingInMemorySize() {
+// return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+// }
+//
+// public boolean isThrottleSourcesToMemoryLimit() {
+// // Keep the queue in memory.
+// return true;
+// }
+//
+// public int getDisconnectedThrottleRate() {
+// // By default don't throttle consumers when disconnected.
+// return 0;
+// }
+//
+// public int getRecoveryBias() {
+// return 8;
+// }
+// };
+//
+// private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
+// public Long map(TxOp element) {
+// return element.getExpiration();
+// }
+// };
+//
+// private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
+// public Integer map(TxOp element) {
+// return element.getLimiterSize();
+// }
+// };
+//
+// private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
+// public Integer map(TxOp element) {
+// return element.getPriority();
+// }
+// };
+//
+// private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
+// public Long map(TxOp element) {
+// return element.getStoreTracking();
+// }
+// };
+//
+// private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
+// public Integer map(TxOp element) {
+// return 1;
+// }
+// };
+//
+// TransactionManager(VirtualHost host) {
+// this.host = host;
+// txStore = new TransactionStore(host.getDatabase());
+// database = host.getDatabase();
+// }
+//
+// /**
+// * @return The TM's virtual host
+// */
+// public final VirtualHost getVirtualHost() {
+// return host;
+// }
+//
+// /**
+// * @param msg
+// * @param controller
+// */
+// public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
+// if (msg.getStoreTracking() == -1) {
+// msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
+// }
+// transactions.get(msg.getTransactionId()).addMessage(msg, controller);
+// }
+//
+// /**
+// * Creates a transaction.
+// *
+// * @param xid
+// * @return
+// */
+// public synchronized final Transaction createTransaction(Buffer xid) {
+// Transaction ret;
+//
+// long tid = tidGen.incrementAndGet();
+// IQueue<Long, TxOp> opQueue = createTransactionQueue(tid);
+//
+// if (xid == null) {
+// ret = new LocalTransaction(this, tid, opQueue);
+// } else {
+// XATransaction xat = new XATransaction(this, tid, xid, opQueue);
+// ret = xat;
+// xaTransactions.put(xid, xat);
+// }
+//
+// transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
+// transactions.put(ret.getTid(), ret);
+//
+// return ret;
+// }
+//
+// /**
+// * @param buffer
+// * @return
+// */
+// public synchronized Transaction getXATransaction(Buffer buffer) {
+// return xaTransactions.get(buffer);
+// }
+//
+// /**
+// *
+// * @throws Exception
+// */
+// public synchronized void loadTransactions() throws Exception {
+//
+// tidGen.set(database.allocateStoreTracking());
+//
+// Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
+//
+// // Load shared queues
+// Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+// while (results.hasNext()) {
+// QueueQueryResult loaded = results.next();
+//
+// Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
+// if (b == null) {
+// LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
+// database.deleteQueue(loaded.getDescriptor());
+// }
+//
+// IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
+// Transaction tx = loadTransaction(b, queue);
+//
+// //TODO if we recover a tx that isn't committed then, we should discard it.
+// if (tx.getState() < Transaction.COMMITED_STATE) {
+// LOG.warn("Recovered unfinished transaction: " + tx);
+// }
+// transactions.put(tx.getTid(), tx);
+// if (tx instanceof XATransaction) {
+// XATransaction xat = XATransaction.class.cast(tx);
+// xaTransactions.put(xat.getXid(), xat);
+// }
+//
+// LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+// }
+//
+// if (!txns.isEmpty()) {
+// //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
+// //deleted:
+// LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
+// }
+// }
+//
+// private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
+// //TODO move the serialization into the transaction itself:
+// DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
+// byte type = bais.readByte();
+// byte state = bais.readByte();
+// long tid = bais.readLong();
+//
+// Transaction tx = null;
+// switch (type) {
+// case Transaction.TYPE_LOCAL:
+// tx = new LocalTransaction(this, tid, queue);
+// break;
+// case Transaction.TYPE_XA:
+// int length = bais.readByte() & 0xFF;
+// Buffer xid = new Buffer(new byte[length]);
+// bais.readFully(xid.data);
+// tx = new XATransaction(this, tid, xid, queue);
+// break;
+// default:
+// throw new IOException("Invalid transaction type: " + type);
+//
+// }
+// tx.setState(state, null);
+// return tx;
+//
+// }
+//
+// public ListenableFuture<?> persistTransaction(Transaction tx) {
+//
+// //TODO move the serialization into the transaction itself:
+// DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+// try {
+// baos.writeByte(tx.getType());
+// baos.writeByte(tx.getState());
+// baos.writeLong(tx.getTid());
+// if (tx.getType() == Transaction.TYPE_XA) {
+// Buffer xid = ((XATransaction) tx).getXid();
+// // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+// baos.writeByte(xid.length & 0xFF);
+// baos.write(xid.data, xid.offset, xid.length);
+// }
+// OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+// ctx.requestFlush();
+// return ctx;
+// } catch (IOException ioe) {
+// //Shouldn't happen
+// throw new RuntimeException(ioe);
+// }
+// }
+//
+// private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
+//
+// IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+// return queue;
+// }
+//
+// private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
+// IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+// queue.initialize(0, 0, 0, 0);
+// txStore.addQueue(queue.getDescriptor());
+// return queue;
+// }
+//
+// private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
+// ExclusivePersistentQueue<Long, TxOp> queue;
+//
+// SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
+// @Override
+// public int getElementSize(TxOp elem) {
+// return elem.getLimiterSize();
+// }
+// };
+// queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
+// queue.setStore(txStore);
+// queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
+// queue.setExpirationMapper(EXPIRATION_MAPPER);
+// queue.getDescriptor().setApplicationType(type);
+// return queue;
+// }
+//
+// final QueueStore<Long, Transaction.TxOp> getTxnStore() {
+// return txStore;
+// }
+//
+// private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
+// private final BrokerDatabase database;
+//
+// private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
+// public MessageRecord marshal(TxOp element) {
+// return element.createMessageRecord();
+// }
+//
+// public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
+// Transaction t = transactionsByQueue.get(queue.getQueueName());
+// return Transaction.createTxOp(record, t);
+// }
+// };
+//
+// TransactionStore(BrokerDatabase database) {
+// this.database = database;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
+// * .queue.QueueDescriptor)
+// */
+// public void addQueue(QueueDescriptor queue) {
+// database.addQueue(queue);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
+// * .queue.QueueDescriptor)
+// */
+// public void deleteQueue(QueueDescriptor queue) {
+// database.deleteQueue(queue);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
+// * .activemq.queue.QueueDescriptor, java.lang.Object)
+// */
+// public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
+// database.deleteQueueElement(sqe);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
+// */
+// public boolean isFromStore(TxOp elem) {
+// return elem.isFromStore();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
+// * .activemq.queue.SaveableQueueElement,
+// * org.apache.activemq.flow.ISourceController, boolean)
+// */
+// public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
+// database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
+// * .activemq.queue.QueueDescriptor, boolean, long, long, int,
+// * org.apache.activemq.queue.RestoreListener)
+// */
+// public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
+// database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
+// }
+// }
- private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
- private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
- private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
-
- private final VirtualHost host;
- private BrokerDatabase database;
-
- private final AtomicLong tidGen = new AtomicLong(0);
- private final TransactionStore txStore;
-
- private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
- private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
- // Be default we don't page out elements to disk.
- private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
- //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
-
- private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
-
- private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
-
- public boolean isPersistent(TxOp elem) {
- return elem.isPersistent();
- }
-
- public boolean isPageOutPlaceHolders() {
- return false;
- }
-
- public boolean isPagingEnabled() {
- return PAGING_ENABLED;
- }
-
- public int getPagingInMemorySize() {
- return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
- }
-
- public boolean isThrottleSourcesToMemoryLimit() {
- // Keep the queue in memory.
- return true;
- }
-
- public int getDisconnectedThrottleRate() {
- // By default don't throttle consumers when disconnected.
- return 0;
- }
-
- public int getRecoveryBias() {
- return 8;
- }
- };
-
- private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
- public Long map(TxOp element) {
- return element.getExpiration();
- }
- };
-
- private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
- public Integer map(TxOp element) {
- return element.getLimiterSize();
- }
- };
-
- private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
- public Integer map(TxOp element) {
- return element.getPriority();
- }
- };
-
- private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
- public Long map(TxOp element) {
- return element.getStoreTracking();
- }
- };
-
- private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
- public Integer map(TxOp element) {
- return 1;
- }
- };
-
- TransactionManager(VirtualHost host) {
- this.host = host;
- txStore = new TransactionStore(host.getDatabase());
- database = host.getDatabase();
- }
-
- /**
- * @return The TM's virtual host
- */
- public final VirtualHost getVirtualHost() {
- return host;
- }
-
- /**
- * @param msg
- * @param controller
- */
- public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
- if (msg.getStoreTracking() == -1) {
- msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
- }
- transactions.get(msg.getTransactionId()).addMessage(msg, controller);
- }
-
- /**
- * Creates a transaction.
- *
- * @param xid
- * @return
- */
- public synchronized final Transaction createTransaction(Buffer xid) {
- Transaction ret;
-
- long tid = tidGen.incrementAndGet();
- IQueue<Long, TxOp> opQueue = createTransactionQueue(tid);
-
- if (xid == null) {
- ret = new LocalTransaction(this, tid, opQueue);
- } else {
- XATransaction xat = new XATransaction(this, tid, xid, opQueue);
- ret = xat;
- xaTransactions.put(xid, xat);
- }
-
- transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
- transactions.put(ret.getTid(), ret);
-
- return ret;
- }
-
- /**
- * @param buffer
- * @return
- */
- public synchronized Transaction getXATransaction(Buffer buffer) {
- return xaTransactions.get(buffer);
- }
-
- /**
- *
- * @throws Exception
- */
- public synchronized void loadTransactions() throws Exception {
-
- tidGen.set(database.allocateStoreTracking());
-
- Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
-
- // Load shared queues
- Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
- while (results.hasNext()) {
- QueueQueryResult loaded = results.next();
-
- Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
- if (b == null) {
- LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
- database.deleteQueue(loaded.getDescriptor());
- }
-
- IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
- Transaction tx = loadTransaction(b, queue);
-
- //TODO if we recover a tx that isn't committed then, we should discard it.
- if (tx.getState() < Transaction.COMMITED_STATE) {
- LOG.warn("Recovered unfinished transaction: " + tx);
- }
- transactions.put(tx.getTid(), tx);
- if (tx instanceof XATransaction) {
- XATransaction xat = XATransaction.class.cast(tx);
- xaTransactions.put(xat.getXid(), xat);
- }
-
- LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
- }
-
- if (!txns.isEmpty()) {
- //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
- //deleted:
- LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
- }
- }
-
- private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
- //TODO move the serialization into the transaction itself:
- DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
- byte type = bais.readByte();
- byte state = bais.readByte();
- long tid = bais.readLong();
-
- Transaction tx = null;
- switch (type) {
- case Transaction.TYPE_LOCAL:
- tx = new LocalTransaction(this, tid, queue);
- break;
- case Transaction.TYPE_XA:
- int length = bais.readByte() & 0xFF;
- Buffer xid = new Buffer(new byte[length]);
- bais.readFully(xid.data);
- tx = new XATransaction(this, tid, xid, queue);
- break;
- default:
- throw new IOException("Invalid transaction type: " + type);
-
- }
- tx.setState(state, null);
- return tx;
-
- }
-
- public ListenableFuture<?> persistTransaction(Transaction tx) {
-
- //TODO move the serialization into the transaction itself:
- DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
- try {
- baos.writeByte(tx.getType());
- baos.writeByte(tx.getState());
- baos.writeLong(tx.getTid());
- if (tx.getType() == Transaction.TYPE_XA) {
- Buffer xid = ((XATransaction) tx).getXid();
- // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
- baos.writeByte(xid.length & 0xFF);
- baos.write(xid.data, xid.offset, xid.length);
- }
- OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
- ctx.requestFlush();
- return ctx;
- } catch (IOException ioe) {
- //Shouldn't happen
- throw new RuntimeException(ioe);
- }
- }
-
- private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
-
- IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
- queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
- return queue;
+ public TransactionManager(VirtualHost virtualHost) {
}
- private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
- IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
- queue.initialize(0, 0, 0, 0);
- txStore.addQueue(queue.getDescriptor());
- return queue;
+ public void loadTransactions() {
}
-
- private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
- ExclusivePersistentQueue<Long, TxOp> queue;
-
- SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
- @Override
- public int getElementSize(TxOp elem) {
- return elem.getLimiterSize();
- }
- };
- queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
- queue.setStore(txStore);
- queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
- queue.setExpirationMapper(EXPIRATION_MAPPER);
- queue.getDescriptor().setApplicationType(type);
- return queue;
- }
-
- final QueueStore<Long, Transaction.TxOp> getTxnStore() {
- return txStore;
- }
-
- private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
- private final BrokerDatabase database;
-
- private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
- public MessageRecord marshal(TxOp element) {
- return element.createMessageRecord();
- }
-
- public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
- Transaction t = transactionsByQueue.get(queue.getQueueName());
- return Transaction.createTxOp(record, t);
- }
- };
-
- TransactionStore(BrokerDatabase database) {
- this.database = database;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
- * .queue.QueueDescriptor)
- */
- public void addQueue(QueueDescriptor queue) {
- database.addQueue(queue);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
- * .queue.QueueDescriptor)
- */
- public void deleteQueue(QueueDescriptor queue) {
- database.deleteQueue(queue);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
- * .activemq.queue.QueueDescriptor, java.lang.Object)
- */
- public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
- database.deleteQueueElement(sqe);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
- */
- public boolean isFromStore(TxOp elem) {
- return elem.isFromStore();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
- * .activemq.queue.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
- public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
- database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
- * .activemq.queue.QueueDescriptor, boolean, long, long, int,
- * org.apache.activemq.queue.RestoreListener)
- */
- public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
- database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
- }
- }
-
}