You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/21 04:34:24 UTC
svn commit: r776934 [1/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/protocol/
main/java/org/apache/activemq/broker/stomp/ main/jav...
Author: chirino
Date: Thu May 21 02:34:23 2009
New Revision: 776934
URL: http://svn.apache.org/viewvc?rev=776934&view=rev
Log:
Applying Coling patch at: https://issues.apache.org/activemq/browse/AMQ-2261
Thanks!
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Thu May 21 02:34:23 2009
@@ -20,8 +20,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Set;
-import java.util.Map.Entry;
import org.apache.activemq.broker.store.BrokerDatabase;
import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.queue.Subscription;
+
+public interface BrokerSubscription {
+
+ public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException ;
+
+ public void disconnect(Subscription<MessageDelivery> subscription);
+
+ @SuppressWarnings("serial")
+ public class UserAlreadyConnectedException extends Exception {
+
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Thu May 21 02:34:23 2009
@@ -17,18 +17,13 @@
package org.apache.activemq.broker;
import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
public interface DeliveryTarget {
- public void deliver(MessageDelivery delivery, ISourceController<?> source);
-
- public IFlowSink<MessageDelivery> getSink();
+ public void deliver(MessageDelivery message, ISourceController<?> source);
public boolean hasSelector();
- public boolean match(MessageDelivery message);
-
- public boolean isDurable();
+ public boolean matches(MessageDelivery message);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java Thu May 21 02:34:23 2009
@@ -19,6 +19,9 @@
import java.util.Collection;
import org.apache.activemq.broker.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.protobuf.AsciiBuffer;
public interface Destination {
@@ -26,7 +29,8 @@
AsciiBuffer getDomain();
AsciiBuffer getName();
Collection<Destination> getDestinations();
-
+ public ActiveMQDestination asActiveMQDestination();
+
public class SingleDestination implements Destination {
private AsciiBuffer domain;
@@ -67,6 +71,18 @@
private void setDomain(String domain) {
setDomain(new AsciiBuffer(domain));
}
+
+ public ActiveMQDestination asActiveMQDestination() {
+ if(domain.equals(Router.TOPIC_DOMAIN))
+ {
+ return new ActiveMQTopic(name.toString());
+ }
+ else if(domain.equals(Router.QUEUE_DOMAIN))
+ {
+ return new ActiveMQQueue(name.toString());
+ }
+ return null;
+ }
}
public class MultiDestination implements Destination {
@@ -95,7 +111,12 @@
public AsciiBuffer getName() {
return null;
}
+
+ public ActiveMQDestination asActiveMQDestination() {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
}
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Thu May 21 02:34:23 2009
@@ -34,8 +34,10 @@
public Object remove(AsciiBuffer destinationName);
- public void bind(AsciiBuffer destinationName, DeliveryTarget deliveryTarget);
-
+ public void bind(AsciiBuffer destinationName, DeliveryTarget target);
+
+ public void unbind(AsciiBuffer destinationName, DeliveryTarget target);
+
public Collection<DeliveryTarget> route(AsciiBuffer destinationName, MessageDelivery message);
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.Subscription;
+
+public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
+
+ private final ExclusivePersistentQueue<Long, MessageDelivery> queue;
+ private final VirtualHost host;
+ private final Destination destination;
+ private Subscription<MessageDelivery> connectedSub;
+ boolean started = false;
+ BooleanExpression selector;
+
+ DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, ExclusivePersistentQueue<Long, MessageDelivery> queue) {
+ this.host = host;
+ this.queue = queue;
+ this.destination = destination;
+ this.selector = selector;
+ this.host.getRouter().bind(destination, this);
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+ */
+ public void deliver(MessageDelivery message, ISourceController<?> source) {
+ queue.add(message, source);
+ }
+
+ public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ if (this.connectedSub == null) {
+ this.connectedSub = subscription;
+ queue.addSubscription(connectedSub);
+ } else if (connectedSub != subscription) {
+ throw new UserAlreadyConnectedException();
+ }
+ }
+
+ public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+ if (connectedSub != null && connectedSub == subscription) {
+ queue.removeSubscription(connectedSub);
+ connectedSub = null;
+ }
+ }
+
+ public boolean matches(MessageDelivery message) {
+ if (selector == null) {
+ return true;
+ }
+
+ Message msg = message.asType(Message.class);
+ if (msg == null) {
+ return false;
+ }
+
+ MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+ selectorContext.setMessageReference(msg);
+ selectorContext.setDestination(destination.asActiveMQDestination());
+ try {
+ return (selector.matches(selectorContext));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return queue;
+ }
+
+ public boolean hasSelector() {
+ return selector != null;
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Thu May 21 02:34:23 2009
@@ -20,7 +20,7 @@
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
public interface MessageDelivery {
@@ -108,7 +108,7 @@
* @param queue
* The queue for which to acknowledge the message.
*/
- public void acknowledge(QueueStore.QueueDescriptor queue);
+ public void acknowledge(QueueDescriptor queue);
/**
* Gets the tracking number used to identify this message in the message
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Thu May 21 02:34:23 2009
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker;
-import java.io.IOException;
-
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Thu May 21 02:34:23 2009
@@ -16,52 +16,37 @@
*/
package org.apache.activemq.broker;
-import java.io.IOException;
-import java.util.HashMap;
-
import org.apache.activemq.broker.DeliveryTarget;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
public class Queue implements DeliveryTarget {
- HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
private Destination destination;
- private IQueue<Long, MessageDelivery> queue;
+ private final IQueue<Long, MessageDelivery> queue;
private VirtualHost virtualHost;
Queue(IQueue<Long, MessageDelivery> queue) {
this.queue = queue;
}
+
- public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
- queue.add(delivery, source);
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+ */
+ public void deliver(MessageDelivery message, ISourceController<?> source) {
+ queue.add(message, source);
}
-
- public final void addConsumer(final DeliveryTarget dt) {
- Subscription<MessageDelivery> sub = new QueueSubscription(dt);
-
- Subscription<MessageDelivery> old = subs.put(dt, sub);
- if (old == null) {
- queue.addSubscription(sub);
- } else {
- subs.put(dt, old);
- }
+
+ public final void addSubscription(final Subscription<MessageDelivery> sub) {
+ queue.addSubscription(sub);
}
- public boolean removeSubscirption(final DeliveryTarget dt) {
- Subscription<MessageDelivery> sub = subs.remove(dt);
- if (sub != null) {
- return queue.removeSubscription(sub);
- }
- return false;
+ public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+ return queue.removeSubscription(sub);
}
public void start() throws Exception {
@@ -74,15 +59,11 @@
}
}
- public IFlowSink<MessageDelivery> getSink() {
- return queue;
- }
-
public boolean hasSelector() {
return false;
}
- public boolean match(MessageDelivery message) {
+ public boolean matches(MessageDelivery message) {
return true;
}
@@ -106,62 +87,28 @@
return true;
}
- public static class QueueSubscription implements Subscription<MessageDelivery> {
- final DeliveryTarget target;
-
- public QueueSubscription(DeliveryTarget dt) {
- this.target = dt;
- }
-
- public boolean matches(MessageDelivery message) {
- return target.match(message);
- }
-
- public boolean hasSelector() {
- return target.hasSelector();
- }
-
- public boolean isRemoveOnDispatch(MessageDelivery delivery) {
- return !delivery.isPersistent();
- }
-
- public IFlowSink<MessageDelivery> getSink() {
- return target.getSink();
- }
-
- @Override
- public String toString() {
- return target.getSink().toString();
- }
-
- public boolean offer(MessageDelivery elem, ISourceController<MessageDelivery> controller, SubscriptionDeliveryCallback callback) {
- return target.getSink().offer(new QueueDelivery(elem, callback), controller);
- }
-
- public boolean isBrowser() {
- return false;
+ public static class QueueSubscription implements BrokerSubscription {
+ Subscription<MessageDelivery> subscription;
+ final Queue queue;
+
+ public QueueSubscription(Queue queue) {
+ this.queue = queue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+ */
+ public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+ this.subscription = subscription;
+ queue.addSubscription(subscription);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+ */
+ public void disconnect(Subscription<MessageDelivery> context) {
+ queue.removeSubscription(subscription);
}
}
- private static class QueueDelivery extends MessageDeliveryWrapper {
- private final SubscriptionDeliveryCallback callback;
-
- QueueDelivery(MessageDelivery delivery, SubscriptionDeliveryCallback callback) {
- super(delivery);
- this.callback = callback;
- }
-
- @Override
- public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
- // We override this for queue deliveries as the sub needn't
- // persist the message
- }
-
- public void acknowledge(QueueStore.QueueDescriptor queue) {
- if (callback != null) {
- callback.acknowledge();
- }
- }
-
- }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu May 21 02:34:23 2009
@@ -27,7 +27,6 @@
import org.apache.activemq.protobuf.AsciiBuffer;
public class QueueDomain implements Domain {
-
final HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
public void add(AsciiBuffer name, Object queue) {
@@ -37,8 +36,12 @@
return queues.remove(name);
}
- public void bind(AsciiBuffer name, DeliveryTarget deliveryTarget) {
- queues.get(name).addConsumer(deliveryTarget);
+ public void bind(AsciiBuffer name, DeliveryTarget subscription) {
+
+ }
+
+ public void unbind(AsciiBuffer name, DeliveryTarget subscription) {
+
}
public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Thu May 21 02:34:23 2009
@@ -57,36 +57,18 @@
return domains.remove(name);
}
- public synchronized void bind(Destination destination, DeliveryTarget dt) {
+ public synchronized void bind(Destination destination, DeliveryTarget target) {
Domain domain = domains.get(destination.getDomain());
- domain.bind(destination.getName(), dt);
+ domain.bind(destination.getName(), target);
+ }
+
+ public synchronized void unbind(Destination destination, DeliveryTarget target) {
+ Domain domain = domains.get(destination.getDomain());
+ domain.unbind(destination.getName(), target);
}
public void route(final BrokerMessageDelivery msg, ISourceController<?> controller) {
- // final Buffer transactionId = msg.getTransactionId();
- // if( msg.isPersistent() ) {
- // VoidCallback<RuntimeException> tx = new
- // VoidCallback<RuntimeException>() {
- // @Override
- // public void run(Session session) throws RuntimeException {
- // Long messageKey = session.messageAdd(msg.createMessageRecord());
- // if( transactionId!=null ) {
- // session.transactionAddMessage(transactionId, messageKey);
- // }
- // }
- // };
- // Runnable onFlush = new Runnable() {
- // public void run() {
- // if( msg.isResponseRequired() ) {
- // // Let the client know the broker got the message.
- // msg.onMessagePersisted();
- // }
- // }
- // };
- // virtualHost.getStore().execute(tx, onFlush);
- // }
- //
Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
//Set up the delivery for persistence:
@@ -100,8 +82,8 @@
if (targets != null) {
// The sinks will request persistence via MessageDelivery.persist()
// if they require persistence:
- for (DeliveryTarget dt : targets) {
- dt.deliver(msg, controller);
+ for (DeliveryTarget target : targets) {
+ target.deliver(msg, controller);
}
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu May 21 02:34:23 2009
@@ -26,11 +26,12 @@
import org.apache.activemq.protobuf.AsciiBuffer;
public class TopicDomain implements Domain {
-
+
final HashMap<AsciiBuffer, ArrayList<DeliveryTarget>> topicsTargets = new HashMap<AsciiBuffer, ArrayList<DeliveryTarget>>();
public void add(AsciiBuffer name, Object queue) {
}
+
public Object remove(AsciiBuffer name) {
return null;
}
@@ -44,7 +45,17 @@
targets.add(target);
}
- public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+ public void unbind(AsciiBuffer name, DeliveryTarget target) {
+ ArrayList<DeliveryTarget> targets = topicsTargets.get(name);
+ if (targets != null) {
+ targets.remove(target);
+ if (targets.isEmpty()) {
+ topicsTargets.remove(topicsTargets);
+ }
+ }
+ }
+
+ public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery message) {
return topicsTargets.get(name);
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.Subscription;
+
+public class TopicSubscription implements BrokerSubscription, DeliveryTarget {
+
+ protected final BooleanExpression selector;
+ protected final Destination destination;
+ protected Subscription<MessageDelivery> connectedSub;
+ private final VirtualHost host;
+
+ TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+ this.host = host;
+ this.selector = selector;
+ this.destination = destination;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+ */
+ public final void deliver(MessageDelivery message, ISourceController<?> source) {
+ connectedSub.add(message, source, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+ */
+ public boolean hasSelector() {
+ return selector != null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
+ * .broker.protocol.ProtocolHandler.ConsumerContext)
+ */
+ public synchronized void connect(Subscription<MessageDelivery> context) throws UserAlreadyConnectedException {
+ host.getRouter().bind(destination, this);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
+ * .broker.protocol.ProtocolHandler.ConsumerContext)
+ */
+ public synchronized void disconnect(Subscription<MessageDelivery> context) {
+ host.getRouter().unbind(destination, this);
+ }
+
+ public boolean matches(MessageDelivery message) {
+ if (selector == null) {
+ return true;
+ }
+
+ Message msg = message.asType(Message.class);
+ if (msg == null) {
+ return false;
+ }
+
+ MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+ selectorContext.setMessageReference(msg);
+ selectorContext.setDestination(destination.asActiveMQDestination());
+ try {
+ return (selector.matches(selectorContext));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Thu May 21 02:34:23 2009
@@ -20,7 +20,9 @@
import java.util.HashMap;
import org.apache.activemq.Service;
+import org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
import org.apache.activemq.queue.IQueue;
/**
@@ -31,6 +33,7 @@
final private BrokerQueueStore queueStore;
final private MessageBroker broker;
final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
+ final private HashMap<String, DurableSubscription> durableSubs = new HashMap<String, DurableSubscription>();
private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
private Router router;
private boolean started;
@@ -98,12 +101,11 @@
}
public synchronized Queue createQueue(Destination dest) throws Exception {
- if(!started)
- {
+ if (!started) {
//Queues from the store must be loaded before we can create new ones:
throw new IllegalStateException("Can't create queue on unstarted host");
}
-
+
Queue queue = queues.get(dest);
// If the queue doesn't exist create it:
if (queue == null) {
@@ -121,4 +123,28 @@
public BrokerQueueStore getQueueStore() {
return queueStore;
}
+
+ public BrokerSubscription createSubscription(ConsumerContext consumer) {
+ Destination destination = consumer.getDestination();
+ BrokerSubscription sub = null;
+ if(destination.getDomain().equals(Router.TOPIC_DOMAIN))
+ {
+ if (consumer.isDurable()) {
+ sub = durableSubs.get(consumer.getSubscriptionName());
+ if (sub == null) {
+ ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+ queue.start();
+ DurableSubscription dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
+ durableSubs.put(consumer.getSubscriptionName(), dsub);
+ sub = dsub;
+ }
+ } else if (consumer.getDestination().getDomain().equals(Router.TOPIC_DOMAIN)) {
+ sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
+ }
+ } else {
+ Queue queue = queues.get(destination.getName());
+ sub = new Queue.QueueSubscription(queue);
+ }
+ return sub;
+ }
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu May 21 02:34:23 2009
@@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -28,11 +27,12 @@
import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.BrokerMessageDelivery;
-import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.BrokerSubscription;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
import org.apache.activemq.broker.VirtualHost;
+import org.apache.activemq.broker.BrokerSubscription.UserAlreadyConnectedException;
import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
import org.apache.activemq.broker.protocol.ProtocolHandler;
import org.apache.activemq.broker.store.Store.MessageRecord;
@@ -76,7 +76,6 @@
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowDrain;
import org.apache.activemq.flow.IFlowLimiter;
import org.apache.activemq.flow.IFlowResource;
import org.apache.activemq.flow.IFlowSink;
@@ -86,10 +85,6 @@
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.WireFormatNegotiator;
@@ -409,158 +404,109 @@
}
}
- class ConsumerContext implements DeliveryTarget {
+ class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
private final ConsumerInfo info;
private String name;
private BooleanExpression selector;
private boolean isDurable;
private boolean isQueueReceiver;
- private QueueStore.QueueDescriptor durableQueueId;
- private SingleFlowRelay<MessageDelivery> queue;
- public WindowLimiter<MessageDelivery> limiter;
- private AtomicLong deliverySequence = new AtomicLong(0);
+ private final FlowController<MessageDelivery> controller;
+ private final WindowLimiter<MessageDelivery> limiter;
- HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
+ HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
- public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+ public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException, UserAlreadyConnectedException {
this.info = info;
this.name = info.getConsumerId().toString();
- isDurable = info.isDurable();
- if (isDurable) {
- durableQueueId = new QueueStore.QueueDescriptor();
- durableQueueId.setQueueName(new AsciiBuffer(info.getSubscriptionName()));
- try {
- connection.getBroker().getDefaultVirtualHost().getQueueStore().addQueue(durableQueueId);
- } catch (Throwable thrown) {
- thrown.printStackTrace();
- }
- }
-
- selector = parseSelector(info);
-
Flow flow = new Flow("broker-" + name + "-outbound", false);
+ if (info.isDurable())
+
+ selector = parseSelector(info);
limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
};
- queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
- queue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
- Message msg = message.asType(Message.class);
- MessageDispatch md = new MessageDispatch();
- md.setConsumerId(info.getConsumerId());
- md.setMessage(msg);
- md.setDestination(msg.getDestination());
- // Add to the pending list if persistent and we are durable:
- if (message.isPersistent() && (isDurable() || isQueueReceiver())) {
- synchronized (queue) {
- Object old = pendingMessages.put(msg.getMessageId(), message);
- if (old != null) {
- new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
- }
- pendingMessageIds.add(msg.getMessageId());
- connection.write(md);
- }
- } else {
- if (isQueueReceiver()) {
- message.acknowledge(durableQueueId);
- }
- connection.write(md);
- }
- };
- });
- // Subscribe
- if (info.getDestination().isQueue()) {
- isQueueReceiver = true;
+ controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
+ controller.useOverFlowQueue(false);
+ controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
+ super.onFlowOpened(controller);
+
+ BrokerSubscription sub = host.createSubscription(this);
+ sub.connect(this);
+ }
+
+ public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ if (!controller.offer(message, source)) {
+ return false;
+ } else {
+ sendInternal(message, controller, callback);
+ return true;
+ }
+ }
+
+ public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ controller.add(message, source);
+ sendInternal(message, controller, callback);
+ }
+
+ private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ Message msg = message.asType(Message.class);
+ MessageDispatch md = new MessageDispatch();
+ md.setConsumerId(info.getConsumerId());
+ md.setMessage(msg);
+ md.setDestination(msg.getDestination());
+ // Add to the pending list if persistent and we are durable:
+ if (callback != null) {
+ synchronized (this) {
+ Object old = pendingMessages.put(msg.getMessageId(), callback);
+ if (old != null) {
+ new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
+ }
+ pendingMessageIds.add(msg.getMessageId());
+ connection.write(md);
+ }
+ } else {
+ connection.write(md);
}
- router.bind(convert(info.getDestination()), this);
}
public void ack(MessageAck info) {
// TODO: The pending message queue could probably be optimized to
// avoid having to create a new list here.
- LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
- synchronized (queue) {
- if (isDurable() || isQueueReceiver()) {
- MessageId id = info.getLastMessageId();
+ LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
+ synchronized (this) {
+ MessageId id = info.getLastMessageId();
+ if (isDurable() || isQueueReceiver())
while (!pendingMessageIds.isEmpty()) {
MessageId pendingId = pendingMessageIds.getFirst();
- MessageDelivery delivery = pendingMessages.remove(pendingId);
- acked.add(delivery);
+ SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+ acked.add(callback);
pendingMessageIds.removeFirst();
if (pendingId.equals(id)) {
break;
}
}
-
- }
limiter.onProtocolCredit(info.getMessageCount());
}
// Delete outside of synchronization on queue to avoid contention
// with enqueueing threads.
- for (MessageDelivery delivery : acked) {
- delivery.acknowledge(durableQueueId);
+ for (SubscriptionDeliveryCallback callback : acked) {
+ callback.acknowledge();
}
}
- public IFlowSink<MessageDelivery> getSink() {
- return queue;
- }
-
- public final void deliver(final MessageDelivery delivery, ISourceController<?> source) {
- if (!match(delivery)) {
- return;
- }
-
- if (isDurable() && delivery.isPersistent()) {
- try {
-
- final long sequence = deliverySequence.incrementAndGet();
- //TODO saveable queue element here is temporary: We should replace this
- //with an actual queue implementation:
- delivery.persist(new SaveableQueueElement<MessageDelivery>() {
-
- public MessageDelivery getElement() {
- return delivery;
- }
-
- public QueueDescriptor getQueueDescriptor() {
- return durableQueueId;
- }
-
- public long getSequenceNumber() {
- return sequence;
- }
-
- public void notifySave() {
- //noop
- }
- public boolean requestSaveNotify() {
- return false;
- }
-
- }, null, true);
- } catch (Exception e) {
- // TODO Auto-generated catch restoreBlock
- e.printStackTrace();
- }
- }
-
- queue.add(delivery, source);
- }
-
public boolean hasSelector() {
return selector != null;
}
- public boolean match(MessageDelivery message) {
+ public boolean matches(MessageDelivery message) {
Message msg = message.asType(Message.class);
if (msg == null) {
return false;
@@ -578,15 +524,137 @@
}
public boolean isDurable() {
- return isDurable;
+ return info.isDurable();
}
public boolean isQueueReceiver() {
return isQueueReceiver;
}
- public AsciiBuffer getPersistentQueueName() {
- return null;
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.queue.Subscription#isBrowser()
+ */
+ public boolean isBrowser() {
+ return info.isBrowser();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+ * .Object)
+ */
+ public boolean isRemoveOnDispatch(MessageDelivery elem) {
+ return !elem.isPersistent() || !(isDurable || isQueueReceiver);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getDestination()
+ */
+ public Destination getDestination() {
+ return convert(info.getDestination());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getJMSSelector()
+ */
+ public String getSelectorString() {
+ return info.getSelector();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getSubscriptionName()
+ */
+ public String getSubscriptionName() {
+ return info.getSubscriptionName();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getFullSelector()
+ */
+ public BooleanExpression getSelectorExpression() {
+ return selector;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getJMSSelector()
+ */
+ public String getSelector() {
+ return info.getSelector();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getConnection()
+ */
+ public BrokerConnection getConnection() {
+ return connection;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+ * #getConsumerId()
+ */
+ public String getConsumerId() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.queue.Subscription#getSink()
+ */
+ public IFlowSink<MessageDelivery> getSink() {
+ return this;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
+ * org.apache.activemq.flow.ISourceController)
+ */
+ public void add(MessageDelivery message, ISourceController<?> source) {
+ add(message, source, null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object,
+ * org.apache.activemq.flow.ISourceController)
+ */
+ public boolean offer(MessageDelivery message, ISourceController<?> source) {
+ return offer(message, source, null);
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Thu May 21 02:34:23 2009
@@ -21,16 +21,40 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.BrokerMessageDelivery;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.queue.Subscription;
import org.apache.activemq.wireformat.WireFormat;
public interface ProtocolHandler extends Service {
-
+
public void setConnection(BrokerConnection connection);
+
+ public BrokerConnection getConnection();
+
public void onCommand(Object command);
+
public void onException(Exception error);
+
public void setWireFormat(WireFormat wf);
-
+
public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+ public interface ConsumerContext extends Subscription<MessageDelivery>, IFlowSink<MessageDelivery>{
+ public String getConsumerId();
+
+ public Destination getDestination();
+
+ public String getSelector();
+
+ public BooleanExpression getSelectorExpression();
+
+ public boolean isDurable();
+
+ public String getSubscriptionName();
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu May 21 02:34:23 2009
@@ -25,14 +25,13 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.WindowLimiter;
import org.apache.activemq.broker.BrokerConnection;
import org.apache.activemq.broker.BrokerMessageDelivery;
-import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.BrokerSubscription;
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.Router;
@@ -51,10 +50,7 @@
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
@@ -118,7 +114,6 @@
public void onStompFrame(StompFrame frame) throws Exception {
ConsumerContext ctx = new ConsumerContext(frame);
consumers.put(ctx.stompDestination, ctx);
- router.bind(ctx.destination, ctx);
ack(frame);
}
});
@@ -266,10 +261,11 @@
}
}
- class ConsumerContext implements DeliveryTarget {
+ class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
private BooleanExpression selector;
-
+ private String selectorString;
+
private SingleFlowRelay<MessageDelivery> queue;
public WindowLimiter<MessageDelivery> limiter;
private FrameTranslator translator;
@@ -278,11 +274,9 @@
private Destination destination;
private String ackMode;
- private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
+ private LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback>();
private boolean durable;
- private QueueStore.QueueDescriptor durableQueueId;
- private AtomicLong deliverySequence = new AtomicLong(0);
public ConsumerContext(final StompFrame subscribe) throws Exception {
translator = translator(subscribe);
@@ -291,7 +285,7 @@
stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
-
+
ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
ackMode = StompSubscription.CLIENT_ACK;
@@ -304,33 +298,25 @@
ackMode = StompSubscription.AUTO_ACK;
}
- selector = parseSelector(subscribe);
+ selectorString = subscribe.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
+ selector = parseSelector(selectorString);
if (ackMode != StompSubscription.AUTO_ACK) {
Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
- limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, connection.getOutputWindowSize(), connection.getOutputResumeThreshold()) {
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
};
- queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
- queue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
- StompFrame frame = message.asType(StompFrame.class);
- if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
- synchronized (allSentMessageIds) {
- AsciiBuffer msgId = message.getMsgId();
- sentMessageIds.put(msgId, msgId);
- allSentMessageIds.put(msgId, ConsumerContext.this);
- }
- }
- connection.write(frame);
- };
- });
+
+ //FIXME need to keep track of actual size:
+ //And Create a flow controller:
} else {
queue = outboundQueue;
}
-
+
+ BrokerSubscription sub = router.getVirtualHost().createSubscription(this);
+ sub.connect(this);
}
public void ack(StompFrame info) throws Exception {
@@ -342,6 +328,7 @@
AsciiBuffer next = iterator.next();
iterator.remove();
allSentMessageIds.remove(next);
+ //FIXME need to keep track of actual size:
credits++;
if (next.equals(mid)) {
break;
@@ -369,7 +356,7 @@
return false;
}
- public boolean match(MessageDelivery message) {
+ public boolean matches(MessageDelivery message) {
StompFrame stompMessage = message.asType(StompFrame.class);
if (stompMessage == null) {
return false;
@@ -396,60 +383,109 @@
// return false;
// }
}
-
- public void deliver(final MessageDelivery delivery, ISourceController<?> source) {
- if (!match(delivery)) {
- return;
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#send(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+ */
+ public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ addInternal(message, controller, callback);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+ */
+ public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ //FIXME need a controller:
+ return false;
+ }
+
+ private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback)
+ {
+ StompFrame frame = message.asType(StompFrame.class);
+ if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
+ synchronized (allSentMessageIds) {
+ AsciiBuffer msgId = message.getMsgId();
+ sentMessageIds.put(msgId, callback);
+ allSentMessageIds.put(msgId, ConsumerContext.this);
+ }
}
+ connection.write(frame);
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
- if (isDurable() && delivery.isPersistent()) {
- try {
- final long sequence = deliverySequence.incrementAndGet();
- //TODO saveable queue element here is temporary: We should replace this
- //with an actual queue implementation:
- delivery.persist(new SaveableQueueElement<MessageDelivery>() {
-
- public MessageDelivery getElement() {
- // TODO Auto-generated method stub
- return delivery;
- }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getConnection()
+ */
+ public BrokerConnection getConnection() {
+ return connection;
+ }
- public QueueDescriptor getQueueDescriptor() {
- // TODO Auto-generated method stub
- return durableQueueId;
- }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getDestination()
+ */
+ public Destination getDestination() {
+ return destination;
+ }
- public long getSequenceNumber() {
- return sequence;
- }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getFullSelector()
+ */
+ public BooleanExpression getSelectorExpression() {
+ return selector;
+ }
- public void notifySave() {
- //noop
- }
- public boolean requestSaveNotify() {
- // TODO Auto-generated method stub
- return false;
- }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getSelector()
+ */
+ public String getSelector() {
+ return selectorString;
+ }
- }, null, true);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getSubscriptionName()
+ */
+ public String getSubscriptionName() {
+ return subscriptionId;
+ }
- queue.add(delivery, source);
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getConsumerId()
+ */
+ public String getConsumerId() {
+ return subscriptionId;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.Subscription#isBrowser()
+ */
+ public boolean isBrowser() {
+ return false;
}
- public boolean isDurable() {
- return durable;
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang.Object)
+ */
+ public boolean isRemoveOnDispatch(MessageDelivery elem) {
+ //TODO fix this.
+ return true;
}
- public AsciiBuffer getPersistentQueueName() {
- return null;
+ /* (non-Javadoc)
+ * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object, org.apache.activemq.flow.ISourceController)
+ */
+ public void add(MessageDelivery elem, ISourceController<?> source) {
+ add(elem, source, null);
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object, org.apache.activemq.flow.ISourceController)
+ */
+ public boolean offer(MessageDelivery elem, ISourceController<?> source) {
+ return offer(elem, source, null);
+ }
}
private void sendError(String message) {
@@ -513,9 +549,8 @@
return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
}
- private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException {
+ private static BooleanExpression parseSelector(String selector) throws InvalidSelectorException {
BooleanExpression rc = null;
- String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
if (selector != null) {
rc = SelectorParser.parse(selector);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu May 21 02:34:23 2009
@@ -47,7 +47,6 @@
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
import org.apache.activemq.queue.QueueStore.RestoreListener;
import org.apache.activemq.queue.QueueStore.RestoredElement;
import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java Thu May 21 02:34:23 2009
@@ -26,9 +26,11 @@
/**
* Used by a FlowSource that is being dispatched to drain it's elements.
* The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
- * when the element has been dispatched to all downstream sinks if IFlowSource
- * @param elem
- * @param controller
+ * when the element has been dispatched to all downstream sinks unless the
+ * IFlowSource#getAutoRelease() is set to true.
+ *
+ * @param elem The element being drained
+ * @param controller The source's controller
*/
public void drain(E elem, ISourceController<E> controller);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java Thu May 21 02:34:23 2009
@@ -54,10 +54,8 @@
private long nextSequenceNumber = 0;
private int totalQueueCount;
- // For now each queue element is assigned a restoreBlock number
- // which is used for tracking page in requests. A trailing
- // consumer will request messages from at most one restoreBlock
- // at a time from the database.
+ //Dictates the chunk size of messages or place holders
+ //pulled in from the database:
private static final int RESTORE_BLOCK_SIZE = 1000;
private final PersistencePolicy<V> persistencePolicy;
private final Mapper<Long, V> expirationMapper;
@@ -65,6 +63,7 @@
private final QueueStore<?, V> queueStore;
private final ElementLoader loader;
public final QueueDescriptor queueDescriptor;
+ private final Object mutex;
public CursoredQueue(PersistencePolicy<V> persistencePolicy, Mapper<Long, V> expirationMapper, Flow flow, QueueDescriptor queueDescriptor, QueueStore<?, V> store, Object mutex) {
this.persistencePolicy = persistencePolicy;
@@ -100,8 +99,6 @@
}
}
- private final Object mutex;
-
protected abstract void requestDispatch();
protected abstract void acknowledge(QueueElement<V> elem);
@@ -699,7 +696,8 @@
}
/**
- * @param l
+ * @param l Set the highest sequence number to which this
+ * cursor can advance.
*/
public void setLimit(long l) {
limit = l;
@@ -1346,18 +1344,20 @@
// element loaded (until it is deleted)
if (!persistencePolicy.isPagingEnabled()) {
qe.addHardRef();
- // Persist the element if required:
- if (persistencePolicy.isPersistent(qe.elem)) {
- // For now base decision on whether to delay flush on
- // whether or not there are consumers ready:
- // TODO should actually change this to active cursors:
- boolean delayable = !openCursors.isEmpty();
- qe.save(source, delayable);
- }
}
+
+ // Persist the element if required:
+ if (persistencePolicy.isPersistent(qe.elem)) {
+ // For now base decision on whether to delay flush on
+ // whether or not there are consumers ready:
+ // TODO should actually change this to active cursors:
+ boolean delayable = !openCursors.isEmpty();
+ qe.save(source, delayable);
+ }
+
// Check with cursors to see if any of them have room for it
// in memory:
- else {
+ if(persistencePolicy.isPagingEnabled()) {
// Otherwise check with any other open cursor to see if
// it can hang on to the element:
@@ -1394,6 +1394,8 @@
qe.unload(source);
}
}
+
+
}
@@ -1552,6 +1554,7 @@
synchronized (fromDatabase) {
fromDatabase.addAll(msgs);
}
+ requestDispatch();
}
public String toString() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Thu May 21 02:34:23 2009
@@ -28,12 +28,13 @@
import org.apache.activemq.queue.CursoredQueue.QueueElement;
import org.apache.activemq.queue.QueueStore.PersistentQueue;
import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
import org.apache.activemq.util.Mapper;
public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements PersistentQueue<K, E> {
private CursoredQueue<E> queue;
private final FlowController<E> controller;
- private IFlowSizeLimiter<E> limiter;
+ private final IFlowSizeLimiter<E> limiter;
private boolean started = true;
private Cursor<E> cursor;
private final QueueDescriptor queueDescriptor;
@@ -41,9 +42,12 @@
private QueueStore<K, E> queueStore;
private Mapper<Long, E> expirationMapper;
private boolean initialized;
+ private Subscription<E> subscription;
+ private ISourceController<E> sourceController;
+ protected boolean subBlocked = false;
/**
- * Creates a flow queue that can handle multiple flows.
+ *
*
* @param flow
* The {@link Flow}
@@ -55,12 +59,46 @@
public ExclusivePersistentQueue(String name, IFlowSizeLimiter<E> limiter) {
super(name);
this.queueDescriptor = new QueueStore.QueueDescriptor();
+ this.limiter = limiter;
queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
queueDescriptor.setQueueType(QueueDescriptor.EXCLUSIVE);
//TODO flow should be serialized as part of the subscription.
this.controller = new FlowController<E>(null, new Flow(name, false), limiter, this);
+ this.controller.useOverFlowQueue(false);
super.onFlowOpened(controller);
+
+ sourceController = new ISourceController<E>() {
+
+ public void elementDispatched(E elem) {
+ // TODO Auto-generated method stub
+ }
+
+ public Flow getFlow() {
+ // TODO Auto-generated method stub
+ return controller.getFlow();
+ }
+
+ public IFlowResource getFlowResource() {
+ // TODO Auto-generated method stub
+ return ExclusivePersistentQueue.this;
+ }
+
+ public void onFlowBlock(ISinkController<?> sinkController) {
+ synchronized (ExclusivePersistentQueue.this) {
+ subBlocked = true;
+ }
+ }
+
+ public void onFlowResume(ISinkController<?> sinkController) {
+ synchronized (ExclusivePersistentQueue.this) {
+ subBlocked = false;
+ if (isDispatchReady()) {
+ notifyReady();
+ }
+ }
+ }
+ };
}
/*
@@ -106,11 +144,7 @@
};
queue.initialize(sequenceMin, sequenceMax, count, size);
- initialized = true;
- }
-
- public void connect(Subscription<E> sub)
- {
+
//Open a cursor for the queue:
FlowController<QueueElement<E>> memoryController = null;
if (persistencePolicy.isPagingEnabled()) {
@@ -131,9 +165,32 @@
}
cursor = queue.openCursor(getResourceName(), memoryController, true, true);
+ cursor.reset(sequenceMin);
+ cursor.activate();
+
+ initialized = true;
+ }
+
+ public synchronized void addSubscription(Subscription<E> sub) {
+ if (subscription != null) {
+ if (subscription != sub) {
+ //TODO change this to something other than a runtime exception:
+ throw new IllegalStateException();
+ }
+ return;
+ }
+ this.subscription = sub;
+ subBlocked = false;
+ if (isDispatchReady()) {
+ notifyReady();
+ }
+ }
+
+ public synchronized void removeSubscription(Subscription<E> sub) {
+ if (sub == subscription) {
+ sub = null;
+ }
}
-
-
protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
return controller;
@@ -167,7 +224,7 @@
private final void accepted(ISourceController<?> source, E elem) {
queue.add(source, elem);
- if (started) {
+ if (isDispatchReady()) {
notifyReady();
}
}
@@ -197,39 +254,72 @@
}
public final boolean isDispatchReady() {
- return started && !cursor.isReady();
- }
-
- public final boolean pollingDispatch() {
- E elem = poll();
+ if (started && subscription != null && !subBlocked && cursor.isReady()) {
+ return true;
+ }
- if (elem != null) {
- drain.drain(elem, controller);
+ if (queue.needsDispatch()) {
return true;
- } else {
- return false;
}
- }
- public final E poll() {
- synchronized (this) {
- if (!started) {
- return null;
- }
+ return false;
+ }
+ private QueueElement last = null;
+ public synchronized final boolean pollingDispatch() {
+ queue.dispatch();
+ if (started && subscription != null && !subBlocked) {
QueueElement<E> qe = cursor.getNext();
-
- // FIXME the release should really be done after dispatch.
- // doing it here saves us from having to resynchronize
- // after dispatch, but release limiter space too soon.
if (qe != null) {
- if (autoRelease) {
+ // If the sub doesn't remove on dispatch set an ack listener:
+ SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
+
+ if(qe.acquired || limiter.getSize() == 0 || (last != null && last.sequence >= qe.sequence))
+ {
+ System.out.println("Offering" + qe + limiter.getSize());
+ }
+
+ // See if the sink has room:
+ if (subscription.offer(qe.elem, sourceController, callback)) {
+ if(limiter.getElementSize(qe.getElement()) > 1048)
+ {
+ System.out.println("Offering" + qe);
+ }
+ qe.setAcquired(true);
controller.elementDispatched(qe.getElement());
+ last = qe;
+ // If remove on dispatch acknowledge now:
+ if (callback == null) {
+ qe.acknowledge();
+ }
}
- return qe.getElement();
}
- return null;
}
+
+ return isDispatchReady();
+ }
+
+ public E poll() {
+ throw new UnsupportedOperationException("poll not supported for exclusive queue");
+ //
+ // synchronized (this) {
+ // if (!started) {
+ // return null;
+ // }
+ //
+ // QueueElement<E> qe = cursor.getNext();
+ //
+ // // FIXME the release should really be done after dispatch.
+ // // doing it here saves us from having to resynchronize
+ // // after dispatch, but release limiter space too soon.
+ // if (qe != null) {
+ // if (autoRelease) {
+ // controller.elementDispatched(qe.getElement());
+ // }
+ // return qe.getElement();
+ // }
+ // return null;
+ // }
}
@Override
@@ -280,8 +370,7 @@
* @return The size of the elements in this queue or -1 if not yet known.
*/
public synchronized long getEnqueuedSize() {
- if(!initialized)
- {
+ if (!initialized) {
return -1;
}
return limiter.getSize();
@@ -291,8 +380,7 @@
* @return The count of the elements in this queue or -1 if not yet known.
*/
public synchronized long getEnqueuedCount() {
- if(!initialized)
- {
+ if (!initialized) {
return -1;
}
return queue.getEnqueuedCount();
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java Thu May 21 02:34:23 2009
@@ -94,7 +94,26 @@
* {@link ISourceController#onFlowBlock(ISinkController)} prior to
* returning false.
*/
- public boolean offer(E element, ISourceController<E> controller, SubscriptionDeliveryCallback callback);
+ public boolean offer(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+
+ /**
+ * Pushes an item to the subscription. If the subscription is not remove on
+ * dispatch, then it must call acknowledge method on the callback when it
+ * has acknowledged the message.
+ *
+ * @param element
+ * The delivery container the offered element.
+ * @param controller
+ * The queue's controller, which must be used if the added
+ * element exceeds the subscription's buffer limits.
+ * @param callback
+ * The {@link SubscriptionDeliveryCallback} associated with the element
+ * @return true if the element was accepted false otherwise, if false is
+ * returned the caller must have called
+ * {@link ISourceController#onFlowBlock(ISinkController)} prior to
+ * returning false.
+ */
+ public void add(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
@Deprecated
public IFlowSink<E> getSink();