You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 20:40:49 UTC
svn commit: r769099 [1/5] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/protocol/
main/java/org/apache/activemq/broker/stomp/ main/jav...
Author: chirino
Date: Mon Apr 27 18:40:44 2009
New Revision: 769099
URL: http://svn.apache.org/viewvc?rev=769099&view=rev
Log:
Applying colins patch for https://issues.apache.org/activemq/browse/AMQ-2230
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -17,129 +17,173 @@
package org.apache.activemq.broker;
import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map.Entry;
import org.apache.activemq.broker.store.BrokerDatabase;
import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
public abstract class BrokerMessageDelivery implements MessageDelivery {
- HashSet<AsciiBuffer> persistentTargets;
- // Indicates whether or not the message has been saved to the
- // database, if not then in memory updates can be done.
- boolean saved = false;
+ // True while the message is being dispatched to the delivery targets:
+ boolean dispatching = false;
+
+ // A non null pending save indicates that the message is the
+ // saver queue and that the message
+ OperationContext pendingSave;
+
+ // List of persistent targets for which the message should be saved
+ // when dispatch is complete:
+ HashMap<QueueStore.QueueDescriptor, Long> persistentTargets;
+
long storeTracking = -1;
BrokerDatabase store;
boolean fromStore = false;
boolean enableFlushDelay = true;
- OperationContext saveContext;
- boolean cancelled = false;
+ private int limiterSize = -1;
- public void setFromStore(boolean val) {
+ public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
fromStore = true;
+ store = database;
+ storeTracking = mRecord.getKey();
+ limiterSize = mRecord.getSize();
+ }
+
+ public final int getFlowLimiterSize() {
+ if (limiterSize == -1) {
+ limiterSize = getMemorySize();
+ }
+ return limiterSize;
}
+ /**
+ * Subclass must implement this to return their current memory size
+ * estimate.
+ *
+ * @return The memory size of the message.
+ */
+ public abstract int getMemorySize();
+
public final boolean isFromStore() {
return fromStore;
}
- public final void persist(AsciiBuffer queue, boolean delayable) throws IOException {
-
+ public final void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long queueSequence, boolean delayable) throws IOException {
synchronized (this) {
- if (!saved) {
+ // Can flush of this message to the store be delayed?
+ if (enableFlushDelay && !delayable) {
+ enableFlushDelay = false;
+ }
+ // If this message is being dispatched then add the queue to the
+ // list of queues for which to save the message when dispatch is
+ // finished:
+ if (dispatching) {
if (persistentTargets == null) {
- persistentTargets = new HashSet<AsciiBuffer>();
+ persistentTargets = new HashMap<QueueStore.QueueDescriptor, Long>();
}
- persistentTargets.add(queue);
+ persistentTargets.put(queue, queueSequence);
return;
}
- if (!delayable) {
- enableFlushDelay = false;
+ // Otherwise, if it is still in the saver queue, we can add this
+ // queue to the queue list:
+ else if (pendingSave != null) {
+ persistentTargets.put(queue, queueSequence);
+ if (!delayable) {
+ pendingSave.requestFlush();
+ }
+ return;
}
}
- // TODO probably need to pass in the saving queue's source controller
- // here and treat it like it is dispatching to the saver queue.
- store.saveMessage(this, queue, null);
+ store.saveMessage(this, queue, queueSequence, controller);
}
- public final void delete(AsciiBuffer queue) {
+ public final void acknowledge(QueueStore.QueueDescriptor queue) {
boolean firePersistListener = false;
+ boolean deleted = false;
synchronized (this) {
- if (!saved) {
- persistentTargets.remove(queue);
- if (persistentTargets.isEmpty()) {
- if (saveContext != null) {
+ // If the message hasn't been saved to the database
+ // then we don't need to issue a delete:
+ if (dispatching || pendingSave != null) {
- if (!cancelled) {
- if (saveContext.cancel()) {
- cancelled = true;
- firePersistListener = true;
- }
+ // Remove the queue:
+ persistentTargets.remove(queue);
+ deleted = true;
- saved = true;
+ // We get a save context when we place the message in the
+ // database queue. If it has been added to the queue,
+ // and we've removed the last queue, see if we can cancel
+ // the save:
+ if (pendingSave != null && persistentTargets.isEmpty()) {
+ if (pendingSave.cancel()) {
+ pendingSave = null;
+ if (isPersistent()) {
+ firePersistListener = true;
}
}
}
- } else {
- store.deleteMessage(this, queue);
}
}
+ if (!deleted) {
+ store.deleteMessage(this, queue);
+ }
+
if (firePersistListener) {
onMessagePersisted();
}
}
- public void setStoreTracking(long storeTracking) {
- this.storeTracking = storeTracking;
+ public void beginDispatch(BrokerDatabase database) {
+ this.store = database;
+ dispatching = true;
+ if (storeTracking == -1) {
+ storeTracking = database.allocateStoreTracking();
+ }
}
public long getStoreTracking() {
return storeTracking;
}
- public Collection<AsciiBuffer> getPersistentQueues() {
- return persistentTargets;
+ public Set<Entry<QueueDescriptor, Long>> getPersistentQueues() {
+ return persistentTargets.entrySet();
}
public void beginStore() {
synchronized (this) {
- saved = true;
+ pendingSave = null;
}
}
- public void persistIfNeeded(ISourceController<?> controller) throws IOException {
+ public void finishDispatch(ISourceController<?> controller) throws IOException {
boolean firePersistListener = false;
synchronized (this) {
- boolean saveNeeded = true;
- if (persistentTargets == null || persistentTargets.isEmpty()) {
- saveNeeded = false;
- saved = true;
- }
-
// If any of the targets requested save then save the message
// Note that this could be the case even if the message isn't
// persistent if a target requested that the message be spooled
// for some other reason such as queue memory overflow.
- if (saveNeeded) {
- saveContext = store.persistReceivedMessage(this, controller);
+ if (persistentTargets != null && !persistentTargets.isEmpty()) {
+ pendingSave = store.persistReceivedMessage(this, controller);
}
+
// If none of the targets required persistence, then fire the
// persist listener:
- else if (isResponseRequired() && isPersistent()) {
+ if (pendingSave == null || !isPersistent()) {
firePersistListener = true;
}
+ dispatching = false;
}
if (firePersistListener) {
onMessagePersisted();
}
-
}
public boolean isFlushDelayable() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Apr 27 18:40:44 2009
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.broker.store.BrokerDatabase;
-import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener;
-import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.queue.Store;
-import org.apache.activemq.queue.Subscription;
-
-public class DBQueueStore<K> implements Store<K, MessageDelivery> {
-
- private final BrokerDatabase database;
- private final AsciiBuffer queue;
- private final MessageRetriever retriever;
-
- private long firstKey = -1;
- private long lastKey = -1;
-
- private int count = 0;
- private boolean loading = true;
-
- protected HashMap<K, DBStoreNode> map = new HashMap<K, DBStoreNode>();
- protected TreeMap<Long, DBStoreNode> order = new TreeMap<Long, DBStoreNode>();
- private Mapper<K, MessageDelivery> keyExtractor;
-
- DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) {
- this.database = database;
- this.queue = queue;
- retriever = new MessageRetriever(dispatcher);
- retriever.start();
- }
-
- public StoreNode<K, MessageDelivery> add(K key, MessageDelivery delivery) {
-
- // New to this queue?
- if (delivery.getStoreTracking() > lastKey) {
- return addInternal(key, delivery);
- } else {
- throw new IllegalArgumentException(this + " Duplicate key: " + delivery);
- }
- }
-
- public void setKeyMapper(Mapper<K, MessageDelivery> keyExtractor) {
- this.keyExtractor = keyExtractor;
- }
-
- private DBStoreNode addInternal(K key, MessageDelivery delivery) {
- DBStoreNode node = new DBStoreNode(delivery);
- map.put(keyExtractor.map(delivery), node);
- order.put(delivery.getStoreTracking(), node);
- return node;
- }
-
- public boolean isEmpty() {
- return count == 0;
- }
-
- public StoreCursor<K, MessageDelivery> openCursor() {
- return new DBStoreCursor();
- }
-
- public StoreCursor<K, MessageDelivery> openCursorAt(StoreNode<K, MessageDelivery> next) {
- DBStoreCursor cursor = new DBStoreCursor();
- cursor.next = (DBStoreNode) next;
- return cursor;
- }
-
- public StoreNode<K, MessageDelivery> remove(K key) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public int size() {
- return count;
- }
-
- private class DBStoreCursor implements StoreCursor<K, MessageDelivery> {
- private long pos;
- private long last = -1;
-
- private DBStoreNode node;
- private DBStoreNode next;
-
- public StoreNode<K, MessageDelivery> peekNext() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public void setNext(StoreNode<K, MessageDelivery> node) {
- this.next = (DBStoreNode) next;
-
- }
-
- public boolean hasNext() {
- if (next != null)
- return true;
-
- SortedMap<Long, DBStoreNode> m = order.tailMap(last + 1);
- if (m.isEmpty()) {
- next = null;
- } else {
- next = m.get(m.firstKey());
- }
- return next != null;
- }
-
- public StoreNode<K, MessageDelivery> next() {
- try {
- hasNext();
- return next;
- } finally {
- last = next.tracking;
- next = null;
- }
- }
-
- public boolean isReady() {
- return !loading;
- }
-
- public void remove() {
- database.deleteMessage(node.delivery, queue);
- }
- }
-
- private class DBStoreNode implements StoreNode<K, MessageDelivery> {
- private MessageDelivery delivery;
- private K key;
- private long ownerId = -1;
- private final long tracking;
-
- DBStoreNode(MessageDelivery delivery) {
- this.delivery = delivery;
- tracking = delivery.getStoreTracking();
- key = keyExtractor.map(delivery);
- retriever.save(this);
- }
-
- public boolean acquire(Subscription<MessageDelivery> owner) {
- long id = owner.getSink().getResourceId();
- // TODO Auto-generated method stub
- if (ownerId == -1 || id == ownerId) {
- ownerId = owner.getSink().getResourceId();
- return true;
- }
- return false;
- }
-
- public K getKey() {
- return key;
- }
-
- public MessageDelivery getValue() {
- return delivery;
- }
-
- public void unacquire() {
- ownerId = -1;
- }
- }
-
- private class MessageRetriever implements Dispatchable, MessageRestoreListener {
-
- private final DispatchContext dispatchContext;
- private AtomicBoolean loaded = new AtomicBoolean(false);
-
- private long loadCursor = 0;
- private long max = -1;
- private long loadedCount;
-
- private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
-
- MessageRetriever(IDispatcher dispatcher) {
- dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue);
- }
-
- public void save(DBStoreNode node) {
- try {
- node.delivery.persist(queue, false);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void start() {
- if (!loaded.get()) {
- database.restoreMessages(queue, loadCursor, 50, this);
- }
- }
-
- public boolean dispatch() {
- while (true) {
- RestoredMessage restored = restoredMsgs.poll();
-
- if (restored == null) {
- break;
- }
-
- try {
- MessageDelivery delivery = restored.getMessageDelivery();
- addInternal(keyExtractor.map(delivery), delivery);
- if (firstKey == -1) {
- firstKey = delivery.getStoreTracking();
- }
- if (lastKey < delivery.getStoreTracking()) {
- lastKey = delivery.getStoreTracking();
- }
- loadedCount++;
-
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- if (!loaded.get()) {
- database.restoreMessages(queue, loadCursor, 50, this);
- }
- return false;
- }
-
- public void messagesRestored(Collection<RestoredMessage> msgs) {
- if (!msgs.isEmpty()) {
- restoredMsgs.addAll(msgs);
- } else {
- loaded.set(true);
- }
- dispatchContext.requestDispatch();
- }
- }
-}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Apr 27 18:40:44 2009
@@ -19,7 +19,6 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.protobuf.AsciiBuffer;
public interface DeliveryTarget {
@@ -27,9 +26,9 @@
public IFlowSink<MessageDelivery> getSink();
+ public boolean hasSelector();
+
public boolean match(MessageDelivery message);
public boolean isDurable();
-
- public AsciiBuffer getPersistentQueueName();
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Apr 27 18:40:44 2009
@@ -48,6 +48,7 @@
private String name;
private IDispatcher dispatcher;
private BrokerDatabase database;
+
private final AtomicBoolean stopping = new AtomicBoolean();
public String getName() {
@@ -123,6 +124,14 @@
this.dispatcher = dispatcher;
}
+ public BrokerDatabase getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(BrokerDatabase database) {
+ this.database = database;
+ }
+
public String getBindUri() {
return bindUri;
}
@@ -149,8 +158,7 @@
public VirtualHost getDefaultVirtualHost() {
synchronized (virtualHosts) {
if (defaultVirtualHost == null) {
- defaultVirtualHost = new VirtualHost();
- defaultVirtualHost.setDatabase(database);
+ defaultVirtualHost = new VirtualHost(this);
ArrayList<AsciiBuffer> names = new ArrayList<AsciiBuffer>(1);
names.add(new AsciiBuffer("default"));
defaultVirtualHost.setHostNames(names);
@@ -188,7 +196,6 @@
setDefaultVirtualHost(host);
}
}
- host.setDatabase(database);
}
public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -19,8 +19,10 @@
import java.io.IOException;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
public interface MessageDelivery {
@@ -50,7 +52,7 @@
/**
* Called when the message's persistence requirements have been met. This
- * method must not block.
+ * method must not restoreBlock.
*/
public void onMessagePersisted();
@@ -58,16 +60,31 @@
public Buffer getTransactionId();
- public void persist(AsciiBuffer queue, boolean delayable) throws IOException;
+ /**
+ * Asynchronously persists a message in the store.
+ *
+ * @param queue
+ * The queue against which to save the message.
+ * @param controller
+ * The source of the message.
+ * @param sequenceNumber
+ * The sequence number of the message in the queue
+ * @param delayable
+ * Can be set to indicate that flush of the message can be
+ * delayed in the hopes that an acknowledgement will negate the
+ * need for a delete
+ * @throws IOException If there is an exception serializing the message.
+ */
+ public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException;
- public void delete(AsciiBuffer queue);
-
/**
- * Sets the unique storage tracking number.
- * @param tracking The tracking number.
+ * Acknowledges the message for a particular queue. This will cause it to be
+ * deleted from the message store.
+ *
+ * @param queue The queue for which to acknowledge the message.
*/
- public void setStoreTracking(long tracking);
-
+ public void acknowledge(QueueStore.QueueDescriptor queue);
+
/**
* Gets the tracking number used to identify this message in the message
* store.
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Apr 27 18:40:44 2009
@@ -24,122 +24,36 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.PrioritySizeLimiter;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.queue.PartitionedQueue;
-import org.apache.activemq.queue.SharedPriorityQueue;
-import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
public class Queue implements DeliveryTarget {
HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
private Destination destination;
- private IQueue<AsciiBuffer, MessageDelivery> queue;
- private MessageBroker broker;
-
- private Mapper<Integer, MessageDelivery> partitionMapper;
- private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
-
- private IQueue<AsciiBuffer, MessageDelivery> createQueue() {
+ private IQueue<Long, MessageDelivery> queue;
+ private VirtualHost virtualHost;
- if (partitionMapper!=null) {
- PartitionedQueue<Integer, AsciiBuffer, MessageDelivery> queue = new PartitionedQueue<Integer, AsciiBuffer, MessageDelivery>() {
- @Override
- protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
- return createSharedFlowQueue();
- }
-
- public boolean isElementPersistent(MessageDelivery elem) {
- return elem.isPersistent();
- }
- };
- queue.setPartitionMapper(partitionMapper);
- queue.setResourceName(destination.getName().toString());
- return queue;
- } else {
- return createSharedFlowQueue();
- }
+ Queue(IQueue<Long, MessageDelivery> queue)
+ {
+ this.queue = queue;
}
-
-
- public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
- public Integer map(MessageDelivery element) {
- return element.getPriority();
- }
- };
- private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
- if (MessageBroker.MAX_PRIORITY > 1) {
- PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
- limiter.setPriorityMapper(PRIORITY_MAPPER);
- SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
- queue.setKeyMapper(keyExtractor);
- queue.setAutoRelease(true);
- //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
- //store.setKeyMapper(keyExtractor);
- //queue.setStore(store);
- queue.setDispatcher(broker.getDispatcher());
- return queue;
- } else {
- SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
- SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
- queue.setKeyMapper(keyExtractor);
- queue.setAutoRelease(true);
- //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
- //store.setKeyMapper(keyExtractor);
- //queue.setStore(store);
- queue.setDispatcher(broker.getDispatcher());
- return queue;
- }
- }
-
public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
- try {
- if(delivery.isPersistent())
- {
- delivery.persist(destination.getName(), true);
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
queue.add(delivery, source);
}
-
- public final Destination getDestination() {
- return destination;
- }
public final void addConsumer(final DeliveryTarget dt) {
- Subscription<MessageDelivery> sub = new Subscription<MessageDelivery>() {
- public boolean isPreAcquired() {
- return true;
- }
-
- public boolean matches(MessageDelivery message) {
- return dt.match(message);
- }
-
- public boolean isRemoveOnDispatch() {
- return true;
- }
+ Subscription<MessageDelivery> sub = new QueueSubscription(dt);
- public IFlowSink<MessageDelivery> getSink() {
- return dt.getSink();
- }
-
- @Override
- public String toString() {
- return getSink().toString();
- }
- };
- subs.put(dt, sub);
- queue.addSubscription(sub);
+ Subscription<MessageDelivery> old = subs.put(dt, sub);
+ if (old == null) {
+ queue.addSubscription(sub);
+ } else {
+ subs.put(dt, old);
+ }
}
public boolean removeSubscirption(final DeliveryTarget dt) {
@@ -151,54 +65,108 @@
}
public void start() throws Exception {
- queue = createQueue();
+ queue.start();
}
public void stop() throws Exception {
+ if (queue != null) {
+ queue.stop();
+ }
}
public IFlowSink<MessageDelivery> getSink() {
return queue;
}
+ public boolean hasSelector() {
+ return false;
+ }
+
public boolean match(MessageDelivery message) {
return true;
}
- public MessageBroker getBroker() {
- return broker;
+ public VirtualHost getBroker() {
+ return virtualHost;
}
- public void setBroker(MessageBroker broker) {
- this.broker = broker;
+ public void setVirtualHost(VirtualHost virtualHost) {
+ this.virtualHost = virtualHost;
}
- public Mapper<Integer, MessageDelivery> getPartitionMapper() {
- return partitionMapper;
+ public void setDestination(Destination destination) {
+ this.destination = destination;
}
- public void setPartitionMapper(Mapper<Integer, MessageDelivery> partitionMapper) {
- this.partitionMapper = partitionMapper;
+ public final Destination getDestination() {
+ return destination;
}
-
- public Mapper<AsciiBuffer, MessageDelivery> getKeyExtractor() {
- return keyExtractor;
+
+ public boolean isDurable()
+ {
+ return true;
}
- public void setKeyExtractor(Mapper<AsciiBuffer, MessageDelivery> keyExtractor) {
- this.keyExtractor = keyExtractor;
- }
+ public static class QueueSubscription implements Subscription<MessageDelivery> {
+ final DeliveryTarget target;
- public void setDestination(Destination destination) {
- this.destination = destination;
- }
+ public QueueSubscription(DeliveryTarget dt) {
+ this.target = dt;
+ }
+
+ public boolean isPreAcquired() {
+ return true;
+ }
+
+ public boolean matches(MessageDelivery message) {
+ return target.match(message);
+ }
+
+ public boolean hasSelector() {
+ return target.hasSelector();
+ }
+
+ public boolean isRemoveOnDispatch() {
+ return false;
+ }
- public AsciiBuffer getPersistentQueueName() {
- // TODO Auto-generated method stub
- return destination.getName();
+ public IFlowSink<MessageDelivery> getSink() {
+ return target.getSink();
+ }
+
+ @Override
+ public String toString() {
+ return target.getSink().toString();
+ }
+
+ public boolean offer(MessageDelivery elem, ISourceController<MessageDelivery> controller, SubscriptionDeliveryCallback callback) {
+ return target.getSink().offer(new QueueDelivery(elem, callback), controller);
+ }
+
+ public boolean isBrowser() {
+ return false;
+ }
}
- public boolean isDurable() {
- return true;
+ private static class QueueDelivery extends MessageDeliveryWrapper {
+ private final SubscriptionDeliveryCallback callback;
+
+ QueueDelivery(MessageDelivery delivery, SubscriptionDeliveryCallback callback) {
+ super(delivery);
+ this.callback = callback;
+ }
+
+ @Override
+ public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
+ // We override this for queue deliveries as the sub needn't
+ // persist the message
+ }
+
+ public void acknowledge(QueueStore.QueueDescriptor queue) {
+ if (callback != null) {
+ callback.acknowledge();
+ }
+ }
+
}
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Apr 27 18:40:44 2009
@@ -89,37 +89,26 @@
//
Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
- msg.store = database;
- msg.setStoreTracking(msg.store.allocateStoreTracking());
+ //Set up the delivery for persistence:
+ msg.beginDispatch(database);
- // TODO:
- // Consider doing some caching of this target list. Most producers
- // always send to the same destination.
- if (targets != null) {
-
- if (msg.isResponseRequired()) {
- // We need to ack the message once we ensure we won't loose it.
- // We know we won't loose it once it's persisted or delivered to
- // a consumer Setup a callback to get notifed once one of those
- // happens.
- if (!msg.isPersistent()) {
- // Let the client know the broker got the message.
- msg.onMessagePersisted();
+ try
+ {
+ // TODO:
+ // Consider doing some caching of this sub list. Most producers
+ // always send to the same destination.
+ if (targets != null) {
+ // The sinks will request persistence via MessageDelivery.persist()
+ // if they require persistence:
+ for (DeliveryTarget dt : targets) {
+ dt.deliver(msg, controller);
}
}
-
- // The sinks will request persistence via MessageDelivery.persist()
- // if they require persistence:
- for (DeliveryTarget dt : targets) {
- dt.deliver(msg, controller);
- //if (dt.match(msg)) {
- //
- // dt.getSink().add(msg, controller);
- //}
- }
-
+ }
+ finally
+ {
try {
- msg.persistIfNeeded(controller);
+ msg.finishDispatch(controller);
} catch (IOException ioe) {
//TODO: Error serializing the message, this should trigger an error
//This is a pretty severe error as we've already delivered
@@ -128,13 +117,6 @@
//should persist the message prior to sending to the recips?
ioe.printStackTrace();
}
-
- } else {
- // Let the client know we got the message even though there
- // were no valid targets to deliver the message to.
- if (msg.isResponseRequired()) {
- msg.onMessagePersisted();
- }
}
}
@@ -158,7 +140,6 @@
public void setVirtualHost(VirtualHost virtualHost) {
this.virtualHost = virtualHost;
- this.database = virtualHost.getDatabase();
}
public VirtualHost getVirtualHost() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Apr 27 18:40:44 2009
@@ -20,66 +20,105 @@
import java.util.HashMap;
import org.apache.activemq.Service;
-import org.apache.activemq.broker.store.BrokerDatabase;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.IQueue;
/**
* @author chirino
*/
public class VirtualHost implements Service {
-
- final private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
+
+ final private BrokerQueueStore queueStore;
+ final private MessageBroker broker;
+ final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
private Router router;
- private BrokerDatabase database;
-
- public VirtualHost() {
+ private boolean started;
+
+ public VirtualHost(MessageBroker broker) {
+ this.broker = broker;
this.router = new Router();
this.router.setVirtualHost(this);
+ this.queueStore = new BrokerQueueStore();
}
-
+
public AsciiBuffer getHostName() {
- if( hostNames.size() > 0 ) {
+ if (hostNames.size() > 0) {
hostNames.get(0);
}
return null;
}
-
+
public ArrayList<AsciiBuffer> getHostNames() {
return hostNames;
}
+
public void setHostNames(ArrayList<AsciiBuffer> hostNames) {
this.hostNames = hostNames;
}
-
+
public Router getRouter() {
return router;
}
- public void start() throws Exception {
+ public synchronized void start() throws Exception {
+
+ if (started) {
+ return;
+ }
+
+ router.setDatabase(broker.getDatabase());
+
+ queueStore.setDatabase(broker.getDatabase());
+ queueStore.setDispatcher(broker.getDispatcher());
+ queueStore.loadQueues();
+ // Create Queue instances
+ for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
+ Queue queue = new Queue(iQueue);
+ Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
+ Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
+ queue.setDestination(dest);
+ domain.add(dest.getName(), queue);
+ queues.put(dest.getName(), queue);
+ }
for (Queue queue : queues.values()) {
queue.start();
}
+ started = true;
}
- public void stop() throws Exception {
+
+ public synchronized void stop() throws Exception {
+ if (!started) {
+ return;
+ }
for (Queue queue : queues.values()) {
queue.stop();
}
+ started = false;
}
- public void addQueue(Queue queue) {
- Domain domain = router.getDomain(queue.getDestination().getDomain());
- domain.add(queue.getDestination().getName(), queue);
- }
-
- public BrokerDatabase getDatabase() {
- return database;
+ public synchronized Queue createQueue(Destination dest) throws Exception {
+ if(!started)
+ {
+ //Queues from the store must be loaded before we can create new ones:
+ throw new IllegalStateException("Can't create queue on unstarted host");
+ }
+
+ Queue queue = queues.get(dest);
+ // If the queue doesn't exist create it:
+ if (queue == null) {
+ IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
+ queue = new Queue(iQueue);
+ queue.setDestination(dest);
+ Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
+ domain.add(dest.getName(), queue);
+ queues.put(dest.getName(), queue);
+ }
+ queue.start();
+ return queue;
}
- public void setDatabase(BrokerDatabase store) {
- this.database = store;
- router.setDatabase(database);
+ public BrokerQueueStore getQueueStore() {
+ return queueStore;
}
-
-
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -36,6 +36,7 @@
private AsciiBuffer producerId;
private OpenWireFormat storeWireFormat;
private PersistListener persistListener = null;
+ private final int size;
public interface PersistListener {
public void onMessagePersisted(OpenWireMessageDelivery delivery);
@@ -43,6 +44,7 @@
public OpenWireMessageDelivery(Message message) {
this.message = message;
+ this.size = message.getSize();
}
public void setPersistListener(PersistListener listener) {
@@ -56,7 +58,8 @@
return destination;
}
- public int getFlowLimiterSize() {
+ public int getMemorySize() {
+ //return size;
return 1;
}
@@ -112,6 +115,7 @@
record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
record.setStreamKey((long) 0);
record.setMessageId(getMsgId());
+ record.setSize(getFlowLimiterSize());
return record;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -20,12 +20,14 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.DeliveryTarget;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
@@ -83,6 +85,7 @@
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
@@ -100,6 +103,10 @@
private OpenWireFormat storeWireFormat;
private Router router;
+ public OpenwireProtocolHandler() {
+ setStoreWireFormat(new OpenWireFormat());
+ }
+
public void start() throws Exception {
}
@@ -133,7 +140,6 @@
public Response processAddConsumer(ConsumerInfo info) throws Exception {
ConsumerContext ctx = new ConsumerContext(info);
consumers.put(info.getConsumerId(), ctx);
- router.bind(convert(info.getDestination()), ctx);
return ack(command);
}
@@ -403,11 +409,13 @@
private final ConsumerInfo info;
private String name;
private BooleanExpression selector;
- private boolean durable;
- private AsciiBuffer durableQueueName;
+ private boolean isDurable;
+ private boolean isQueueReceiver;
+ private QueueStore.QueueDescriptor durableQueueId;
private SingleFlowRelay<MessageDelivery> queue;
public WindowLimiter<MessageDelivery> limiter;
+ private AtomicLong deliverySequence = new AtomicLong(0);
HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
@@ -415,11 +423,13 @@
public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
this.info = info;
this.name = info.getConsumerId().toString();
- durable = info.isDurable();
- if (durable) {
- durableQueueName = new AsciiBuffer(info.getSubscriptionName());
+
+ isDurable = info.isDurable();
+ if (isDurable) {
+ durableQueueId = new QueueStore.QueueDescriptor();
+ durableQueueId.setQueueName(new AsciiBuffer(info.getSubscriptionName()));
try {
- connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName);
+ connection.getBroker().getDefaultVirtualHost().getQueueStore().addQueue(durableQueueId);
} catch (Throwable thrown) {
thrown.printStackTrace();
}
@@ -442,36 +452,42 @@
md.setMessage(msg);
md.setDestination(msg.getDestination());
// Add to the pending list if persistent and we are durable:
- if (isDurable() && message.isPersistent()) {
+ if (message.isPersistent() && (isDurable() || isQueueReceiver())) {
synchronized (queue) {
Object old = pendingMessages.put(msg.getMessageId(), message);
- if(old != null)
- {
+ if (old != null) {
new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
}
pendingMessageIds.add(msg.getMessageId());
connection.write(md);
}
- }
- else
- {
+ } else {
+ if (isQueueReceiver()) {
+ message.acknowledge(durableQueueId);
+ }
connection.write(md);
}
};
});
+
+ // Subscribe
+ if (info.getDestination().isQueue()) {
+ isQueueReceiver = true;
+ }
+ router.bind(convert(info.getDestination()), this);
}
+
public void ack(MessageAck info) {
- //TODO: The pending message queue could probably be optimized to avoid having
- //to create a new list here.
- LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
+ // TODO: The pending message queue could probably be optimized to
+ // avoid having to create a new list here.
+ LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
synchronized (queue) {
- if (isDurable()) {
+ if (isDurable() || isQueueReceiver()) {
MessageId id = info.getLastMessageId();
while (!pendingMessageIds.isEmpty()) {
MessageId pendingId = pendingMessageIds.getFirst();
MessageDelivery delivery = pendingMessages.remove(pendingId);
acked.add(delivery);
- delivery.delete(durableQueueName);
pendingMessageIds.removeFirst();
if (pendingId.equals(id)) {
break;
@@ -481,12 +497,11 @@
}
limiter.onProtocolCredit(info.getMessageCount());
}
-
- //Delete outside of synchronization on queue to avoid contention with enqueueing
- //threads.
- for(MessageDelivery delivery : acked)
- {
- delivery.delete(durableQueueName);
+
+ // Delete outside of synchronization on queue to avoid contention
+ // with enqueueing threads.
+ for (MessageDelivery delivery : acked) {
+ delivery.acknowledge(durableQueueId);
}
}
@@ -501,9 +516,9 @@
if (isDurable() && delivery.isPersistent()) {
try {
- delivery.persist(durableQueueName, true);
+ delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
} catch (IOException e) {
- // TODO Auto-generated catch block
+ // TODO Auto-generated catch restoreBlock
e.printStackTrace();
}
}
@@ -511,6 +526,10 @@
queue.add(delivery, source);
}
+ public boolean hasSelector() {
+ return selector != null;
+ }
+
public boolean match(MessageDelivery message) {
Message msg = message.asType(Message.class);
if (msg == null) {
@@ -529,7 +548,11 @@
}
public boolean isDurable() {
- return durable;
+ return isDurable;
+ }
+
+ public boolean isQueueReceiver() {
+ return isQueueReceiver;
}
public AsciiBuffer getPersistentQueueName() {
@@ -550,8 +573,7 @@
AsciiBuffer domain;
if (dest.isQueue()) {
domain = Router.QUEUE_DOMAIN;
- }
- if (dest.isTopic()) {
+ } else if (dest.isTopic()) {
domain = Router.TOPIC_DOMAIN;
} else {
throw new IllegalArgumentException("Unsupported domain type: " + dest);
@@ -592,13 +614,18 @@
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = (OpenWireFormat) wireFormat;
- this.storeWireFormat = this.wireFormat.copy();
+ setStoreWireFormat(this.wireFormat.copy());
+ }
+
+ private void setStoreWireFormat(OpenWireFormat wireFormat) {
+ this.storeWireFormat = wireFormat;
+ storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
storeWireFormat.setCacheEnabled(false);
storeWireFormat.setTightEncodingEnabled(false);
storeWireFormat.setSizePrefixDisabled(false);
}
- public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+ public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
Buffer buf = record.getBuffer();
Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length));
OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -20,7 +20,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerConnection;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.wireformat.WireFormat;
@@ -31,6 +31,6 @@
public void onException(Exception error);
public void setWireFormat(WireFormat wf);
- public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+ public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -54,7 +54,7 @@
return destination;
}
- public int getFlowLimiterSize() {
+ public int getMemorySize() {
return frame.getContent().length;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -25,11 +25,13 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.BrokerMessageDelivery;
import org.apache.activemq.broker.DeliveryTarget;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
@@ -49,6 +51,7 @@
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.stomp.Stomp;
@@ -99,7 +102,7 @@
}
});
actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
-
+
public void onStompFrame(StompFrame frame) throws Exception {
String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
@@ -276,7 +279,8 @@
private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
private boolean durable;
- private AsciiBuffer durableQueueName;
+ private QueueStore.QueueDescriptor durableQueueId;
+ private AtomicLong deliverySequence = new AtomicLong(0);
public ConsumerContext(final StompFrame subscribe) throws Exception {
translator = translator(subscribe);
@@ -359,6 +363,10 @@
return queue;
}
+ public boolean hasSelector() {
+ return false;
+ }
+
public boolean match(MessageDelivery message) {
StompFrame stompMessage = message.asType(StompFrame.class);
if (stompMessage == null) {
@@ -394,7 +402,7 @@
if (isDurable() && delivery.isPersistent()) {
try {
- delivery.persist(durableQueueName, true);
+ delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -512,7 +520,7 @@
return null;
}
- public MessageDelivery createMessageDelivery(MessageRecord record) {
+ public BrokerMessageDelivery createMessageDelivery(MessageRecord record) {
throw new UnsupportedOperationException();
}
}