You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:39:04 UTC
svn commit: r961067 [3/5] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul 7 03:39:03 2010
@@ -21,262 +21,260 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
public abstract class BrokerMessageDelivery implements MessageDelivery {
- // True while the message is being dispatched to the delivery targets:
- boolean dispatching = false;
-
- // A non null pending save indicates that the message is the
- // saver queue and that the message
- OperationContext<?> pendingSave;
-
- // List of persistent targets for which the message should be saved
- // when dispatch is complete:
- HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
- SaveableQueueElement<MessageDelivery> singleTarget;
-
- long storeTracking = -1;
- BrokerDatabase store;
- boolean fromStore = false;
- boolean enableFlushDelay = true;
- private int limiterSize = -1;
- private long tid=-1;
-
- public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
- fromStore = true;
- store = database;
- storeTracking = mRecord.getKey();
- limiterSize = mRecord.getSize();
- }
-
- public final int getFlowLimiterSize() {
- if (limiterSize == -1) {
- limiterSize = getMemorySize();
- }
- return limiterSize;
- }
-
- /**
- * When an application wishes to include a message in a broker transaction
- * it must set this the tid returned by {@link Transaction#getTid()}
- *
- * @param tid
- * Sets the tid used to identify the transaction at the broker.
- */
- public void setTransactionId(long tid) {
- this.tid = tid;
- }
-
- /**
- * @return The tid used to identify the transaction at the broker.
- */
- public final long getTransactionId() {
- return tid;
- }
-
- public final void clearTransactionId() {
- tid = -1;
- }
-
- /**
- * Subclass must implement this to return their current memory size
- * estimate.
- *
- * @return The memory size of the message.
- */
- public abstract int getMemorySize();
-
- public final boolean isFromStore() {
- return fromStore;
- }
-
- public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
- synchronized (this) {
- // Can flush of this message to the store be delayed?
- if (enableFlushDelay && !delayable) {
- enableFlushDelay = false;
- }
- // If this message is being dispatched then add the queue to the
- // list of queues for which to save the message when dispatch is
- // finished:
- if (dispatching) {
- addPersistentTarget(sqe);
- return;
- }
- // Otherwise, if it is still in the saver queue, we can add this
- // queue to the queue list:
- else if (pendingSave != null) {
- addPersistentTarget(sqe);
- if (!delayable) {
- pendingSave.requestFlush();
- }
- return;
- }
- }
-
- store.saveMessage(sqe, controller, delayable);
- }
-
- public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
- boolean firePersistListener = false;
- boolean deleted = false;
- synchronized (this) {
- // If the message hasn't been saved to the database
- // then we don't need to issue a delete:
- if (dispatching || pendingSave != null) {
-
- deleted = true;
-
- removePersistentTarget(sqe.getQueueDescriptor());
- // We get a save context when we place the message in the
- // database queue. If it has been added to the queue,
- // and we've removed the last queue, see if we can cancel
- // the save:
- if (pendingSave != null && !hasPersistentTargets()) {
- if (pendingSave.cancel()) {
- pendingSave = null;
- if (isPersistent()) {
- firePersistListener = true;
- }
- }
- }
- }
- }
-
- if (!deleted) {
- store.deleteQueueElement(sqe);
- }
-
- if (firePersistListener) {
- onMessagePersisted();
- }
-
- }
-
- public final void setStoreTracking(long tracking) {
- if (storeTracking == -1) {
- storeTracking = tracking;
- }
- }
-
- public final void beginDispatch(BrokerDatabase database) {
- this.store = database;
- dispatching = true;
- setStoreTracking(database.allocateStoreTracking());
- }
-
- public long getStoreTracking() {
- return storeTracking;
- }
-
- public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
- if (singleTarget != null) {
- ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
- list.add(singleTarget);
- return list;
- } else if (persistentTargets != null) {
- return persistentTargets.values();
- }
- return null;
- }
-
- public void beginStore() {
- synchronized (this) {
- pendingSave = null;
- }
- }
-
- private final boolean hasPersistentTargets() {
- return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
- }
-
- private final void removePersistentTarget(QueueDescriptor queue) {
- if (persistentTargets != null) {
- persistentTargets.remove(queue);
- return;
- }
-
- if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
- singleTarget = null;
- }
- }
-
- private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
- if (persistentTargets != null) {
- persistentTargets.put(elem.getQueueDescriptor(), elem);
- return;
- }
-
- if (singleTarget == null) {
- singleTarget = elem;
- return;
- }
-
- if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
- persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
- persistentTargets.put(elem.getQueueDescriptor(), elem);
- persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
- singleTarget = null;
- }
- }
-
- public final void finishDispatch(ISourceController<?> controller) throws IOException {
- boolean firePersistListener = false;
- synchronized (this) {
- // If any of the targets requested save then save the message
- // Note that this could be the case even if the message isn't
- // persistent if a target requested that the message be spooled
- // for some other reason such as queue memory overflow.
- if (hasPersistentTargets()) {
- pendingSave = store.persistReceivedMessage(this, controller);
- }
-
- // If none of the targets required persistence, then fire the
- // persist listener:
- if (pendingSave == null || !isPersistent()) {
- firePersistListener = true;
- }
- dispatching = false;
- }
-
- if (firePersistListener) {
- onMessagePersisted();
- }
- }
-
- public final MessageRecord createMessageRecord() {
-
- MessageRecord record = new MessageRecord();
- record.setEncoding(getStoreEncoding());
- record.setBuffer(getStoreEncoded());
- record.setStreamKey((long) 0);
- record.setMessageId(getMsgId());
- record.setSize(getFlowLimiterSize());
- record.setKey(getStoreTracking());
- return record;
- }
-
- /**
- * @return A buffer representation of the message to be stored in the store.
- * @throws
- */
- protected abstract Buffer getStoreEncoded();
-
- /**
- * @return The encoding scheme used to store the message.
- */
- protected abstract AsciiBuffer getStoreEncoding();
-
- public boolean isFlushDelayable() {
- // TODO Auto-generated method stub
- return enableFlushDelay;
- }
+// TODO:
+// // True while the message is being dispatched to the delivery targets:
+// boolean dispatching = false;
+//
+// // A non null pending save indicates that the message is the
+// // saver queue and that the message
+// OperationContext<?> pendingSave;
+//
+// // List of persistent targets for which the message should be saved
+// // when dispatch is complete:
+// HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+// SaveableQueueElement<MessageDelivery> singleTarget;
+//
+// long storeTracking = -1;
+// BrokerDatabase store;
+// boolean fromStore = false;
+// boolean enableFlushDelay = true;
+// private int limiterSize = -1;
+// private long tid=-1;
+//
+// public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
+// fromStore = true;
+// store = database;
+// storeTracking = mRecord.getKey();
+// limiterSize = mRecord.getSize();
+// }
+//
+// public final int getFlowLimiterSize() {
+// if (limiterSize == -1) {
+// limiterSize = getMemorySize();
+// }
+// return limiterSize;
+// }
+//
+// /**
+// * When an application wishes to include a message in a broker transaction
+// * it must set this the tid returned by {@link Transaction#getTid()}
+// *
+// * @param tid
+// * Sets the tid used to identify the transaction at the broker.
+// */
+// public void setTransactionId(long tid) {
+// this.tid = tid;
+// }
+//
+// /**
+// * @return The tid used to identify the transaction at the broker.
+// */
+// public final long getTransactionId() {
+// return tid;
+// }
+//
+// public final void clearTransactionId() {
+// tid = -1;
+// }
+//
+// /**
+// * Subclass must implement this to return their current memory size
+// * estimate.
+// *
+// * @return The memory size of the message.
+// */
+// public abstract int getMemorySize();
+//
+// public final boolean isFromStore() {
+// return fromStore;
+// }
+//
+// public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
+// synchronized (this) {
+// // Can flush of this message to the store be delayed?
+// if (enableFlushDelay && !delayable) {
+// enableFlushDelay = false;
+// }
+// // If this message is being dispatched then add the queue to the
+// // list of queues for which to save the message when dispatch is
+// // finished:
+// if (dispatching) {
+// addPersistentTarget(sqe);
+// return;
+// }
+// // Otherwise, if it is still in the saver queue, we can add this
+// // queue to the queue list:
+// else if (pendingSave != null) {
+// addPersistentTarget(sqe);
+// if (!delayable) {
+// pendingSave.requestFlush();
+// }
+// return;
+// }
+// }
+//
+// store.saveMessage(sqe, controller, delayable);
+// }
+//
+// public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+// boolean firePersistListener = false;
+// boolean deleted = false;
+// synchronized (this) {
+// // If the message hasn't been saved to the database
+// // then we don't need to issue a delete:
+// if (dispatching || pendingSave != null) {
+//
+// deleted = true;
+//
+// removePersistentTarget(sqe.getQueueDescriptor());
+// // We get a save context when we place the message in the
+// // database queue. If it has been added to the queue,
+// // and we've removed the last queue, see if we can cancel
+// // the save:
+// if (pendingSave != null && !hasPersistentTargets()) {
+// if (pendingSave.cancel()) {
+// pendingSave = null;
+// if (isPersistent()) {
+// firePersistListener = true;
+// }
+// }
+// }
+// }
+// }
+//
+// if (!deleted) {
+// store.deleteQueueElement(sqe);
+// }
+//
+// if (firePersistListener) {
+// onMessagePersisted();
+// }
+//
+// }
+//
+// public final void setStoreTracking(long tracking) {
+// if (storeTracking == -1) {
+// storeTracking = tracking;
+// }
+// }
+//
+// public final void beginDispatch(BrokerDatabase database) {
+// this.store = database;
+// dispatching = true;
+// setStoreTracking(database.allocateStoreTracking());
+// }
+//
+// public long getStoreTracking() {
+// return storeTracking;
+// }
+//
+// public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+// if (singleTarget != null) {
+// ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+// list.add(singleTarget);
+// return list;
+// } else if (persistentTargets != null) {
+// return persistentTargets.values();
+// }
+// return null;
+// }
+//
+// public void beginStore() {
+// synchronized (this) {
+// pendingSave = null;
+// }
+// }
+//
+// private final boolean hasPersistentTargets() {
+// return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+// }
+//
+// private final void removePersistentTarget(QueueDescriptor queue) {
+// if (persistentTargets != null) {
+// persistentTargets.remove(queue);
+// return;
+// }
+//
+// if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+// singleTarget = null;
+// }
+// }
+//
+// private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+// if (persistentTargets != null) {
+// persistentTargets.put(elem.getQueueDescriptor(), elem);
+// return;
+// }
+//
+// if (singleTarget == null) {
+// singleTarget = elem;
+// return;
+// }
+//
+// if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+// persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+// persistentTargets.put(elem.getQueueDescriptor(), elem);
+// persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+// singleTarget = null;
+// }
+// }
+//
+// public final void finishDispatch(ISourceController<?> controller) throws IOException {
+// boolean firePersistListener = false;
+// synchronized (this) {
+// // If any of the targets requested save then save the message
+// // Note that this could be the case even if the message isn't
+// // persistent if a target requested that the message be spooled
+// // for some other reason such as queue memory overflow.
+// if (hasPersistentTargets()) {
+// pendingSave = store.persistReceivedMessage(this, controller);
+// }
+//
+// // If none of the targets required persistence, then fire the
+// // persist listener:
+// if (pendingSave == null || !isPersistent()) {
+// firePersistListener = true;
+// }
+// dispatching = false;
+// }
+//
+// if (firePersistListener) {
+// onMessagePersisted();
+// }
+// }
+//
+// public final MessageRecord createMessageRecord() {
+//
+// MessageRecord record = new MessageRecord();
+// record.setEncoding(getStoreEncoding());
+// record.setBuffer(getStoreEncoded());
+// record.setStreamKey((long) 0);
+// record.setMessageId(getMsgId());
+// record.setSize(getFlowLimiterSize());
+// record.setKey(getStoreTracking());
+// return record;
+// }
+//
+// /**
+// * @return A buffer representation of the message to be stored in the store.
+// * @throws
+// */
+// protected abstract Buffer getStoreEncoded();
+//
+// /**
+// * @return The encoding scheme used to store the message.
+// */
+// protected abstract AsciiBuffer getStoreEncoding();
+//
+// public boolean isFlushDelayable() {
+// // TODO Auto-generated method stub
+// return enableFlushDelay;
+// }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Wed Jul 7 03:39:03 2010
@@ -25,460 +25,455 @@ import java.util.Iterator;
import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.PrioritySizeLimiter;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.IPartitionedQueue;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.PartitionedQueue;
-import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.RestoreListener;
-import org.apache.activemq.queue.SaveableQueueElement;
-import org.apache.activemq.queue.SharedPriorityQueue;
-import org.apache.activemq.queue.SharedQueue;
-import org.apache.activemq.queue.SharedQueueOld;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.fusesource.hawtdispatch.DispatchQueue;
-public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
-
- private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
- private static final boolean USE_OLD_QUEUE = false;
- private static final boolean USE_PRIORITY_QUEUES = true;
-
- private BrokerDatabase database;
- private DispatchQueue dispatchQueue;
-
- private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
- private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
- * #marshal(java.lang.Object)
- */
- public MessageRecord marshal(MessageDelivery element) {
- return element.createMessageRecord();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
- * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
- */
- public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
- ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
- if (handler == null) {
- try {
- handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
- protocolHandlers.put(record.getEncoding().toString(), handler);
- } catch (Throwable thrown) {
- throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
- }
- }
- try {
- return handler.createMessageDelivery(record);
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- };
-
- final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
- return MESSAGE_MARSHALLER;
- }
-
- private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
- public Long map(MessageDelivery element) {
- return element.getExpiration();
- }
- };
-
- private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- return element.getFlowLimiterSize();
- }
- };
-
- public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- return element.getPriority();
- }
- };
-
- static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
- public Long map(MessageDelivery element) {
- return element.getStoreTracking();
- }
- };
-
- static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- // we modulo 10 to have at most 10 partitions which the producers
- // gets split across.
- return (int) (element.getProducerId().hashCode() % 10);
- }
- };
-
- public static final short SUBPARTITION_TYPE = 0;
- public static final short SHARED_QUEUE_TYPE = 1;
- public static final short DURABLE_QUEUE_TYPE = 2;
- public static short TRANSACTION_QUEUE_TYPE = 3;
-
- private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
- private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-
- private Mapper<Integer, MessageDelivery> partitionMapper;
-
- private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
- private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
- // Be default we don't page out elements to disk.
- private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
- //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
-
- private static long dynamicQueueCounter = 0;
-
- private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-
- private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-
- public boolean isPersistent(MessageDelivery elem) {
- return elem.isPersistent();
- }
-
- public boolean isPageOutPlaceHolders() {
- return true;
- }
-
- public boolean isPagingEnabled() {
- return PAGING_ENABLED;
- }
-
- public int getPagingInMemorySize() {
- return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
- }
-
- public boolean isThrottleSourcesToMemoryLimit() {
- // Keep the queue in memory.
- return true;
- }
-
- public int getDisconnectedThrottleRate() {
- // By default don't throttle consumers when disconnected.
- return 0;
- }
-
- public int getRecoveryBias() {
- return 8;
- }
- };
-
- private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
- private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
- // Be default we don't page out elements to disk.
- //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
- private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
-
- private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-
- private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-
- public boolean isPersistent(MessageDelivery elem) {
- return elem.isPersistent();
- }
-
- public boolean isPageOutPlaceHolders() {
- return true;
- }
-
- public boolean isPagingEnabled() {
- return PAGING_ENABLED;
- }
-
- public int getPagingInMemorySize() {
- return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
- }
-
- public boolean isThrottleSourcesToMemoryLimit() {
- // Keep the queue in memory.
- return true;
- }
-
- public int getDisconnectedThrottleRate() {
- // By default don't throttle consumers when disconnected.
- return 0;
- }
-
- public int getRecoveryBias() {
- return 8;
- }
- };
+public class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
+// TODO:
+// private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
+// private static final boolean USE_OLD_QUEUE = false;
+// private static final boolean USE_PRIORITY_QUEUES = true;
+//
+// private BrokerDatabase database;
+// private DispatchQueue dispatchQueue;
+//
+// private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+// private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+// * #marshal(java.lang.Object)
+// */
+// public MessageRecord marshal(MessageDelivery element) {
+// return element.createMessageRecord();
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+// * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
+// */
+// public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
+// ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+// if (handler == null) {
+// try {
+// handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
+// protocolHandlers.put(record.getEncoding().toString(), handler);
+// } catch (Throwable thrown) {
+// throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+// }
+// }
+// try {
+// return handler.createMessageDelivery(record);
+// } catch (IOException ioe) {
+// throw new RuntimeException(ioe);
+// }
+// }
+// };
+//
+// final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
+// return MESSAGE_MARSHALLER;
+// }
+//
+// private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
+// public Long map(MessageDelivery element) {
+// return element.getExpiration();
+// }
+// };
+//
+// private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
+// public Integer map(MessageDelivery element) {
+// return element.getFlowLimiterSize();
+// }
+// };
+//
+// public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+// public Integer map(MessageDelivery element) {
+// return element.getPriority();
+// }
+// };
+//
+// static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+// public Long map(MessageDelivery element) {
+// return element.getStoreTracking();
+// }
+// };
+//
+// static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+// public Integer map(MessageDelivery element) {
+// // we modulo 10 to have at most 10 partitions which the producers
+// // gets split across.
+// return (int) (element.getProducerId().hashCode() % 10);
+// }
+// };
+//
+// public static final short SUBPARTITION_TYPE = 0;
+// public static final short SHARED_QUEUE_TYPE = 1;
+// public static final short DURABLE_QUEUE_TYPE = 2;
+// public static short TRANSACTION_QUEUE_TYPE = 3;
+//
+// private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+// private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+//
+// private Mapper<Integer, MessageDelivery> partitionMapper;
+//
+// private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
+// private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
+// // Be default we don't page out elements to disk.
+// private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+// //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+// private static long dynamicQueueCounter = 0;
+//
+// private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+// private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//
+// public boolean isPersistent(MessageDelivery elem) {
+// return elem.isPersistent();
+// }
+//
+// public boolean isPageOutPlaceHolders() {
+// return true;
+// }
+//
+// public boolean isPagingEnabled() {
+// return PAGING_ENABLED;
+// }
+//
+// public int getPagingInMemorySize() {
+// return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+// }
+//
+// public boolean isThrottleSourcesToMemoryLimit() {
+// // Keep the queue in memory.
+// return true;
+// }
+//
+// public int getDisconnectedThrottleRate() {
+// // By default don't throttle consumers when disconnected.
+// return 0;
+// }
+//
+// public int getRecoveryBias() {
+// return 8;
+// }
+// };
+//
+// private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
+// private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
+// // Be default we don't page out elements to disk.
+// //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+// private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+// private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+// private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//
+// public boolean isPersistent(MessageDelivery elem) {
+// return elem.isPersistent();
+// }
+//
+// public boolean isPageOutPlaceHolders() {
+// return true;
+// }
+//
+// public boolean isPagingEnabled() {
+// return PAGING_ENABLED;
+// }
+//
+// public int getPagingInMemorySize() {
+// return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+// }
+//
+// public boolean isThrottleSourcesToMemoryLimit() {
+// // Keep the queue in memory.
+// return true;
+// }
+//
+// public int getDisconnectedThrottleRate() {
+// // By default don't throttle consumers when disconnected.
+// return 0;
+// }
+//
+// public int getRecoveryBias() {
+// return 8;
+// }
+// };
+//
+// public void setDatabase(BrokerDatabase database) {
+// this.database = database;
+// }
+//
+// public void setDispatchQueue(DispatchQueue dispatchQueue) {
+// this.dispatchQueue = dispatchQueue;
+// }
+//
+// public void loadQueues() throws Exception {
+//
+// // Load shared queues
+// Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
+// while (results.hasNext()) {
+// QueueQueryResult loaded = results.next();
+// IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
+// sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+// LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+// }
+//
+// // Load durable queues
+// results = database.listQueues(DURABLE_QUEUE_TYPE);
+// while (results.hasNext()) {
+// QueueQueryResult loaded = results.next();
+// IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
+// durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+// LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//
+// }
+// }
+//
+// private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
+//
+// IQueue<Long, MessageDelivery> queue;
+// if (parent != null) {
+// queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
+// } else {
+// queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+// }
+//
+// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+// // Creat the child queues
+// Collection<QueueQueryResult> children = loaded.getPartitions();
+// if (children != null) {
+// try {
+// IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
+// for (QueueQueryResult child : children) {
+// createRestoredQueue(pQueue, child);
+// }
+// } catch (ClassCastException cce) {
+// LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
+// throw cce;
+// }
+// }
+//
+// return queue;
+//
+// }
+//
+// private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
+//
+// ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+// //TODO implement this for priority queue:
+// // Create the child queues
+// /*
+// * Collection<QueueQueryResult> children = loaded.getPartitions(); if
+// * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
+// * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
+// * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
+// * child); } } catch (ClassCastException cce) {
+// * LOG.error("Loaded partition for unpartitionable queue: " +
+// * queue.getResourceName()); throw cce; } }
+// */
+//
+// return queue;
+//
+// }
+//
+// public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
+// //TODO
+// return null;
+// }
+//
+// public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
+// synchronized (this) {
+// Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+// ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+// ret.addAll(c);
+// return ret;
+// }
+// }
+//
+// public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
+// IQueue<Long, MessageDelivery> queue = null;
+// synchronized (this) {
+// queue = durableQueues.get(name);
+// if (queue == null) {
+// queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+// queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+// queue.initialize(0, 0, 0, 0);
+// durableQueues.put(name, queue);
+// addQueue(queue.getDescriptor());
+// }
+// }
+//
+// return queue;
+// }
+//
+// public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
+// ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+// synchronized (this) {
+// String name = "temp:" + (dynamicQueueCounter++);
+// queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+// queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+// queue.initialize(0, 0, 0, 0);
+// addQueue(queue.getDescriptor());
+// }
+// return queue;
+// }
+//
+// public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
+// synchronized (this) {
+// Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
+// ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+// ret.addAll(c);
+// return ret;
+// }
+// }
+//
+// public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
+//
+// IQueue<Long, MessageDelivery> queue = null;
+// synchronized (this) {
+// queue = sharedQueues.get(name);
+// if (queue == null) {
+// queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+// queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
+// queue.initialize(0, 0, 0, 0);
+// sharedQueues.put(name, queue);
+// addQueue(queue.getDescriptor());
+// }
+// }
+//
+// return queue;
+// }
+//
+// private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
+// ExclusivePersistentQueue<Long, MessageDelivery> queue;
+//
+// SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
+// @Override
+// public int getElementSize(MessageDelivery elem) {
+// return elem.getFlowLimiterSize();
+// }
+// };
+// queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
+// queue.setStore(this);
+// queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
+// queue.setExpirationMapper(EXPIRATION_MAPPER);
+// return queue;
+// }
+//
+// private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
+//
+// IQueue<Long, MessageDelivery> ret;
+//
+// switch (type) {
+// case QueueDescriptor.PARTITIONED: {
+// PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
+// @Override
+// public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
+// IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+// queue.getDescriptor().setPartitionId(partitionKey);
+// queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
+// return queue;
+// }
+//
+// };
+// queue.setPartitionMapper(partitionMapper);
+//
+// ret = queue;
+// break;
+// }
+// case QueueDescriptor.SHARED_PRIORITY: {
+// PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
+// limiter.setPriorityMapper(PRIORITY_MAPPER);
+// limiter.setSizeMapper(SIZE_MAPPER);
+// SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
+// ret = queue;
+// queue.setKeyMapper(KEY_MAPPER);
+// queue.setAutoRelease(true);
+// break;
+// }
+// case QueueDescriptor.SHARED: {
+// SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
+// @Override
+// public int getElementSize(MessageDelivery elem) {
+// return elem.getFlowLimiterSize();
+// }
+// };
+//
+// if (!USE_OLD_QUEUE) {
+// SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
+// sQueue.setKeyMapper(KEY_MAPPER);
+// sQueue.setAutoRelease(true);
+// ret = sQueue;
+// } else {
+// SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
+// sQueue.setKeyMapper(KEY_MAPPER);
+// sQueue.setAutoRelease(true);
+// ret = sQueue;
+// }
+// break;
+// }
+// default: {
+// throw new IllegalArgumentException("Unknown queue type" + type);
+// }
+// }
+// ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
+// ret.setStore(this);
+// ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
+// ret.setExpirationMapper(EXPIRATION_MAPPER);
+//
+// return ret;
+// }
+//
+// public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
+// MessageDelivery md = sqe.getElement();
+// //If the message delivery isn't null, funnel through it
+// //since the message may not yet be in the store:
+// if (md != null) {
+// md.acknowledge(sqe);
+// } else {
+// database.deleteQueueElement(sqe);
+// }
+//
+// }
+//
+// public final boolean isFromStore(MessageDelivery elem) {
+// return elem.isFromStore();
+// }
+//
+// public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+// elem.getElement().persist(elem, controller, delayable);
+// }
+//
+// public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+// database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
+// }
+//
+// public final void addQueue(QueueDescriptor queue) {
+// database.addQueue(queue);
+// }
+//
+// public final void deleteQueue(QueueDescriptor queue) {
+// database.deleteQueue(queue);
+// }
public void setDatabase(BrokerDatabase database) {
- this.database = database;
}
public void setDispatchQueue(DispatchQueue dispatchQueue) {
- this.dispatchQueue = dispatchQueue;
- }
-
- public void loadQueues() throws Exception {
-
- // Load shared queues
- Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
- while (results.hasNext()) {
- QueueQueryResult loaded = results.next();
- IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
- sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
- LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
- }
-
- // Load durable queues
- results = database.listQueues(DURABLE_QUEUE_TYPE);
- while (results.hasNext()) {
- QueueQueryResult loaded = results.next();
- IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
- durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
- LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-
- }
- }
-
- private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
-
- IQueue<Long, MessageDelivery> queue;
- if (parent != null) {
- queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
- } else {
- queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
- }
-
- queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-
- // Creat the child queues
- Collection<QueueQueryResult> children = loaded.getPartitions();
- if (children != null) {
- try {
- IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
- for (QueueQueryResult child : children) {
- createRestoredQueue(pQueue, child);
- }
- } catch (ClassCastException cce) {
- LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
- throw cce;
- }
- }
-
- return queue;
-
- }
-
- private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
-
- ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
- queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-
- //TODO implement this for priority queue:
- // Create the child queues
- /*
- * Collection<QueueQueryResult> children = loaded.getPartitions(); if
- * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
- * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
- * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
- * child); } } catch (ClassCastException cce) {
- * LOG.error("Loaded partition for unpartitionable queue: " +
- * queue.getResourceName()); throw cce; } }
- */
-
- return queue;
-
- }
-
- public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
- //TODO
- return null;
- }
-
- public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
- synchronized (this) {
- Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
- ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
- ret.addAll(c);
- return ret;
- }
- }
-
- public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
- IQueue<Long, MessageDelivery> queue = null;
- synchronized (this) {
- queue = durableQueues.get(name);
- if (queue == null) {
- queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
- queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
- queue.initialize(0, 0, 0, 0);
- durableQueues.put(name, queue);
- addQueue(queue.getDescriptor());
- }
- }
-
- return queue;
- }
-
- public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
- ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
- synchronized (this) {
- String name = "temp:" + (dynamicQueueCounter++);
- queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
- queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
- queue.initialize(0, 0, 0, 0);
- addQueue(queue.getDescriptor());
- }
- return queue;
- }
-
- public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
- synchronized (this) {
- Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
- ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
- ret.addAll(c);
- return ret;
- }
- }
-
- public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
-
- IQueue<Long, MessageDelivery> queue = null;
- synchronized (this) {
- queue = sharedQueues.get(name);
- if (queue == null) {
- queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
- queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
- queue.initialize(0, 0, 0, 0);
- sharedQueues.put(name, queue);
- addQueue(queue.getDescriptor());
- }
- }
-
- return queue;
- }
-
- private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
- ExclusivePersistentQueue<Long, MessageDelivery> queue;
-
- SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
- @Override
- public int getElementSize(MessageDelivery elem) {
- return elem.getFlowLimiterSize();
- }
- };
- queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
- queue.setStore(this);
- queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
- queue.setExpirationMapper(EXPIRATION_MAPPER);
- return queue;
- }
-
- private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
-
- IQueue<Long, MessageDelivery> ret;
-
- switch (type) {
- case QueueDescriptor.PARTITIONED: {
- PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
- @Override
- public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
- IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
- queue.getDescriptor().setPartitionId(partitionKey);
- queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
- return queue;
- }
-
- };
- queue.setPartitionMapper(partitionMapper);
-
- ret = queue;
- break;
- }
- case QueueDescriptor.SHARED_PRIORITY: {
- PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
- limiter.setPriorityMapper(PRIORITY_MAPPER);
- limiter.setSizeMapper(SIZE_MAPPER);
- SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
- ret = queue;
- queue.setKeyMapper(KEY_MAPPER);
- queue.setAutoRelease(true);
- break;
- }
- case QueueDescriptor.SHARED: {
- SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
- @Override
- public int getElementSize(MessageDelivery elem) {
- return elem.getFlowLimiterSize();
- }
- };
-
- if (!USE_OLD_QUEUE) {
- SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
- sQueue.setKeyMapper(KEY_MAPPER);
- sQueue.setAutoRelease(true);
- ret = sQueue;
- } else {
- SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
- sQueue.setKeyMapper(KEY_MAPPER);
- sQueue.setAutoRelease(true);
- ret = sQueue;
- }
- break;
- }
- default: {
- throw new IllegalArgumentException("Unknown queue type" + type);
- }
- }
- ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
- ret.setStore(this);
- ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
- ret.setExpirationMapper(EXPIRATION_MAPPER);
-
- return ret;
- }
-
- public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
- MessageDelivery md = sqe.getElement();
- //If the message delivery isn't null, funnel through it
- //since the message may not yet be in the store:
- if (md != null) {
- md.acknowledge(sqe);
- } else {
- database.deleteQueueElement(sqe);
- }
-
- }
-
- public final boolean isFromStore(MessageDelivery elem) {
- return elem.isFromStore();
- }
-
- public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
- elem.getElement().persist(elem, controller, delayable);
- }
-
- public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
- database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
- }
-
- public final void addQueue(QueueDescriptor queue) {
- database.addQueue(queue);
}
- public final void deleteQueue(QueueDescriptor queue) {
- database.deleteQueue(queue);
+ public void loadQueues() {
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java Wed Jul 7 03:39:03 2010
@@ -17,11 +17,10 @@
package org.apache.activemq.apollo.broker;
import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.flow.ISourceController;
public interface DeliveryTarget {
- public void deliver(MessageDelivery message, ISourceController<?> source);
+ public void deliver(MessageDelivery message);
public boolean hasSelector();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Wed Jul 7 03:39:03 2010
@@ -20,22 +20,19 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.FilterException;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
- private final IQueue<Long, MessageDelivery> queue;
+// private final IQueue<Long, MessageDelivery> queue;
private final VirtualHost host;
private final Destination destination;
- private Subscription<MessageDelivery> connectedSub;
+// private Subscription<MessageDelivery> connectedSub;
boolean started = false;
BooleanExpression selector;
- DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, IQueue<Long, MessageDelivery> queue) {
+ DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
this.host = host;
- this.queue = queue;
+// this.queue = queue;
this.destination = destination;
this.selector = selector;
//TODO If a durable subscribes to a queue
@@ -52,24 +49,27 @@ public class DurableSubscription impleme
/* (non-Javadoc)
* @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
*/
- public void deliver(MessageDelivery message, ISourceController<?> source) {
- queue.add(message, source);
+ public void deliver(MessageDelivery message) {
+// TODO:
+// queue.add(message, source);
}
public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
- if (this.connectedSub == null) {
- this.connectedSub = subscription;
- queue.addSubscription(connectedSub);
- } else if (connectedSub != subscription) {
- throw new UserAlreadyConnectedException();
- }
+// TODO:
+// if (this.connectedSub == null) {
+// this.connectedSub = subscription;
+// queue.addSubscription(connectedSub);
+// } else if (connectedSub != subscription) {
+// throw new UserAlreadyConnectedException();
+// }
}
public synchronized void disconnect(final ConsumerContext subscription) {
- if (connectedSub != null && connectedSub == subscription) {
- queue.removeSubscription(connectedSub);
- connectedSub = null;
- }
+// TODO:
+// if (connectedSub != null && connectedSub == subscription) {
+// queue.removeSubscription(connectedSub);
+// connectedSub = null;
+// }
}
public boolean matches(MessageDelivery message) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Wed Jul 7 03:39:03 2010
@@ -23,7 +23,6 @@ import java.util.concurrent.Future;
import javax.transaction.xa.XAException;
-import org.apache.activemq.queue.IQueue;
import org.apache.activemq.util.FutureListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,91 +40,92 @@ public class LocalTransaction extends Tr
private static final Log LOG = LogFactory.getLog(LocalTransaction.class);
- LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
- super(manager, tid, opQueue);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
- */
- @Override
- public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("commit: " + this);
- }
-
- synchronized(this)
- {
- // Get ready for commit.
- try {
- prePrepare();
- } catch (XAException e) {
- throw e;
- } catch (Throwable e) {
- LOG.warn("COMMIT FAILED: ", e);
- rollback(null);
- // Let them know we rolled back.
- XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
- xae.errorCode = XAException.XA_RBOTHER;
- xae.initCause(e);
- throw xae;
- }
-
- //Add the listener for commit
- if(listeners == null)
- {
- listeners = new HashSet<TransactionListener>();
- }
- listeners.add(listener);
-
- //Update the transaction state to committed,
- //and on complete process the commit:
- setState(COMMITED_STATE, new FutureListener<Object>()
- {
- public void onFutureComplete(Future<? extends Object> dbCommitResult) {
- try {
- fireAfterCommit();
- startTransactionProcessor();
- } catch (InterruptedException e) {
- //Shouldn't happen
- LOG.warn(new AssertionError(e));
- } catch (ExecutionException e) {
- LOG.warn("COMMIT FAILED: ", e);
- }
- catch (Exception e)
- {
- }
- }
- });
- }
- }
-
-
- public int prepare(TransactionListener listener) throws XAException {
- XAException xae = new XAException("Prepare not implemented on Local Transactions.");
- xae.errorCode = XAException.XAER_RMERR;
- throw xae;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.Transaction#rollback()
- */
- @Override
- public void rollback(TransactionListener listener) throws XAException, IOException {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not yet implemnted");
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.apollo.broker.Transaction#getType()
- */
- @Override
- public byte getType() {
- return TYPE_LOCAL;
- }
+// TODO:
+// LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+// super(manager, tid, opQueue);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+// */
+// @Override
+// public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
+// if (LOG.isDebugEnabled()) {
+// LOG.debug("commit: " + this);
+// }
+//
+// synchronized(this)
+// {
+// // Get ready for commit.
+// try {
+// prePrepare();
+// } catch (XAException e) {
+// throw e;
+// } catch (Throwable e) {
+// LOG.warn("COMMIT FAILED: ", e);
+// rollback(null);
+// // Let them know we rolled back.
+// XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
+// xae.errorCode = XAException.XA_RBOTHER;
+// xae.initCause(e);
+// throw xae;
+// }
+//
+// //Add the listener for commit
+// if(listeners == null)
+// {
+// listeners = new HashSet<TransactionListener>();
+// }
+// listeners.add(listener);
+//
+// //Update the transaction state to committed,
+// //and on complete process the commit:
+// setState(COMMITED_STATE, new FutureListener<Object>()
+// {
+// public void onFutureComplete(Future<? extends Object> dbCommitResult) {
+// try {
+// fireAfterCommit();
+// startTransactionProcessor();
+// } catch (InterruptedException e) {
+// //Shouldn't happen
+// LOG.warn(new AssertionError(e));
+// } catch (ExecutionException e) {
+// LOG.warn("COMMIT FAILED: ", e);
+// }
+// catch (Exception e)
+// {
+// }
+// }
+// });
+// }
+// }
+//
+//
+// public int prepare(TransactionListener listener) throws XAException {
+// XAException xae = new XAException("Prepare not implemented on Local Transactions.");
+// xae.errorCode = XAException.XAER_RMERR;
+// throw xae;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+// */
+// @Override
+// public void rollback(TransactionListener listener) throws XAException, IOException {
+// // TODO Auto-generated method stub
+// throw new UnsupportedOperationException("Not yet implemnted");
+// }
+//
+// /* (non-Javadoc)
+// * @see org.apache.activemq.apollo.broker.Transaction#getType()
+// */
+// @Override
+// public byte getType() {
+// return TYPE_LOCAL;
+// }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Wed Jul 7 03:39:03 2010
@@ -20,8 +20,6 @@ import java.io.IOException;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
public interface MessageDelivery {
@@ -90,7 +88,7 @@ public interface MessageDelivery {
public void beginDispatch(BrokerDatabase database);
- public void finishDispatch(ISourceController<?> controller) throws IOException;
+// public void finishDispatch(ISourceController<?> controller) throws IOException;
/**
* Sets the unique id used to identify this message in the store.
@@ -98,36 +96,36 @@ public interface MessageDelivery {
*/
public void setStoreTracking(long tracking);
- /**
- * Called by a queue to request that the element be persisted. The save is
- * done asynchronously, and depending on the state of the message delivery
- * may not even be issued to the underlying persistence store until a later
- * date. As such callers should use the acknowledge method to delete this
- * message rather than directly issuing a delete through the message store
- * itself. Direct delete from the message store is only safe once the
- * message has been saved to the store, so callers should request
- * notification of the save via the
- * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
- * to acces the store directly.
- *
- * @param sqe
- * The element to save
- * @param controller
- * A flow controller to use in the event that there isn't room in
- * the database.
- * @param delayable
- * Whether or not the save operation can be delayed.
- */
- public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
-
- /**
- * Acknowledges the message for a particular queue. This will cause it to be
- * deleted from the message store.
- *
- * @param sqe
- * The queue element to delete
- */
- public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
+// /**
+// * Called by a queue to request that the element be persisted. The save is
+// * done asynchronously, and depending on the state of the message delivery
+// * may not even be issued to the underlying persistence store until a later
+// * date. As such callers should use the acknowledge method to delete this
+// * message rather than directly issuing a delete through the message store
+// * itself. Direct delete from the message store is only safe once the
+// * message has been saved to the store, so callers should request
+// * notification of the save via the
+// * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
+// * to acces the store directly.
+// *
+// * @param sqe
+// * The element to save
+// * @param controller
+// * A flow controller to use in the event that there isn't room in
+// * the database.
+// * @param delayable
+// * Whether or not the save operation can be delayed.
+// */
+// public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
+//
+// /**
+// * Acknowledges the message for a particular queue. This will cause it to be
+// * deleted from the message store.
+// *
+// * @param sqe
+// * The queue element to delete
+// */
+// public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
/**
* Gets the tracking number used to identify this message in the message
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Wed Jul 7 03:39:03 2010
@@ -20,8 +20,6 @@ import java.io.IOException;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
import org.apache.activemq.util.buffer.AsciiBuffer;
/**
@@ -31,117 +29,51 @@ import org.apache.activemq.util.buffer.A
public class MessageDeliveryWrapper implements MessageDelivery {
private final MessageDelivery delegate;
+//
+// public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+// delegate.acknowledge(sqe);
+// }
- public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
- delegate.acknowledge(sqe);
- }
-
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public <T> T asType(Class<T> type) {
return delegate.asType(type);
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public MessageRecord createMessageRecord() {
return delegate.createMessageRecord();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public Destination getDestination() {
return delegate.getDestination();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public int getFlowLimiterSize() {
return delegate.getFlowLimiterSize();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public AsciiBuffer getMsgId() {
return delegate.getMsgId();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public int getPriority() {
return delegate.getPriority();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public AsciiBuffer getProducerId() {
return delegate.getProducerId();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public long getStoreTracking() {
return delegate.getStoreTracking();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public long getTransactionId() {
return delegate.getTransactionId();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public boolean isFromStore() {
return delegate.isFromStore();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public boolean isPersistent() {
return delegate.isPersistent();
}
@@ -155,71 +87,33 @@ public class MessageDeliveryWrapper impl
return delegate.getExpiration();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public boolean isResponseRequired() {
return delegate.isResponseRequired();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
public void onMessagePersisted() {
delegate.onMessagePersisted();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
- public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
- delegate.persist(elem, controller, delayable);
- }
-
+// public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+// delegate.persist(elem, controller, delayable);
+// }
+//
public MessageEvaluationContext createMessageEvaluationContext() {
return delegate.createMessageEvaluationContext();
}
- /**
- * (non-Javadoc)
- *
- * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
- * org.apache.activemq.flow.ISourceController, boolean)
- */
MessageDeliveryWrapper(MessageDelivery delivery) {
delegate = delivery;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.MessageDelivery#beginDispatch(org.apache
- * .activemq.apollo.broker.BrokerDatabase)
- */
public void beginDispatch(BrokerDatabase database) {
delegate.beginDispatch(database);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.activemq.apollo.broker.MessageDelivery#finishDispatch(org.
- * apache.activemq.flow.ISourceController)
- */
- public void finishDispatch(ISourceController<?> controller) throws IOException {
- delegate.finishDispatch(controller);
- }
+// public void finishDispatch(ISourceController<?> controller) throws IOException {
+// delegate.finishDispatch(controller);
+// }
/*
* (non-Javadoc)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java Wed Jul 7 03:39:03 2010
@@ -23,98 +23,102 @@ import java.util.HashSet;
import org.apache.activemq.Service;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.queue.Subscription;
import org.apache.activemq.wireformat.WireFormat;
public interface ProtocolHandler extends Service {
+ void onCommand(Object command);
+ void setConnection(BrokerConnection brokerConnection);
- public void setConnection(BrokerConnection connection);
+ void setWireFormat(WireFormat wireformat);
- public BrokerConnection getConnection();
+ void onException(Exception error);
- public void onCommand(Object command);
-
- public void onException(Exception error);
-
- public void setWireFormat(WireFormat wf);
-
- public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
-
- /**
- * ClientContext
- * <p>
- * Description: Base interface describing a channel on a physical
- * connection.
- * </p>
- *
- * @author cmacnaug
- * @version 1.0
- */
- public interface ClientContext {
- public ClientContext getParent();
-
- public Collection<ClientContext> getChildren();
-
- public void addChild(ClientContext context);
-
- public void removeChild(ClientContext context);
-
- public void close();
-
- }
-
- public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
- protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
- protected final ClientContext parent;
- protected boolean closed = false;
-
- public AbstractClientContext(String name, ClientContext parent) {
- super(name);
- this.parent = parent;
- if (parent != null) {
- parent.addChild(this);
- }
- }
-
- public ClientContext getParent() {
- return parent;
- }
-
- public void addChild(ClientContext child) {
- if (!closed) {
- children.add(child);
- }
- }
-
- public void removeChild(ClientContext child) {
- if (!closed) {
- children.remove(child);
- }
- }
-
- public Collection<ClientContext> getChildren() {
- return children;
- }
-
- public void close() {
-
- closed = true;
-
- for (ClientContext c : children) {
- c.close();
- }
-
- if (parent != null) {
- parent.removeChild(this);
- }
-
- super.close();
- }
- }
-
- public interface ConsumerContext extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
+// TODO:
+// public void setConnection(BrokerConnection connection);
+//
+// public BrokerConnection getConnection();
+//
+// public void onCommand(Object command);
+//
+// public void onException(Exception error);
+//
+// public void setWireFormat(WireFormat wf);
+//
+// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+//
+// /**
+// * ClientContext
+// * <p>
+// * Description: Base interface describing a channel on a physical
+// * connection.
+// * </p>
+// *
+// * @author cmacnaug
+// * @version 1.0
+// */
+// public interface ClientContext {
+// public ClientContext getParent();
+//
+// public Collection<ClientContext> getChildren();
+//
+// public void addChild(ClientContext context);
+//
+// public void removeChild(ClientContext context);
+//
+// public void close();
+//
+// }
+//
+// public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
+// protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
+// protected final ClientContext parent;
+// protected boolean closed = false;
+//
+// public AbstractClientContext(String name, ClientContext parent) {
+// super(name);
+// this.parent = parent;
+// if (parent != null) {
+// parent.addChild(this);
+// }
+// }
+//
+// public ClientContext getParent() {
+// return parent;
+// }
+//
+// public void addChild(ClientContext child) {
+// if (!closed) {
+// children.add(child);
+// }
+// }
+//
+// public void removeChild(ClientContext child) {
+// if (!closed) {
+// children.remove(child);
+// }
+// }
+//
+// public Collection<ClientContext> getChildren() {
+// return children;
+// }
+//
+// public void close() {
+//
+// closed = true;
+//
+// for (ClientContext c : children) {
+// c.close();
+// }
+//
+// if (parent != null) {
+// parent.removeChild(this);
+// }
+//
+// super.close();
+// }
+// }
+//
+ public interface ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
public String getConsumerId();
@@ -131,7 +135,7 @@ public interface ProtocolHandler extends
/**
* If the destination does not exist, should it automatically be
* created?
- *
+ *
* @return
*/
public boolean autoCreateDestination();