You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/21 04:34:24 UTC

svn commit: r776934 [1/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...

Author: chirino
Date: Thu May 21 02:34:23 2009
New Revision: 776934

URL: http://svn.apache.org/viewvc?rev=776934&view=rev
Log:
Applying Coling patch at: https://issues.apache.org/activemq/browse/AMQ-2261
Thanks!


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Thu May 21 02:34:23 2009
@@ -20,8 +20,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Set;
-import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.queue.Subscription;
+
+public interface BrokerSubscription {
+
+    public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException ;
+
+    public void disconnect(Subscription<MessageDelivery> subscription);
+    
+    @SuppressWarnings("serial")
+    public class UserAlreadyConnectedException extends Exception {
+
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Thu May 21 02:34:23 2009
@@ -17,18 +17,13 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 
 public interface DeliveryTarget {
     
-    public void deliver(MessageDelivery delivery, ISourceController<?> source);
-    
-    public IFlowSink<MessageDelivery> getSink();
+    public void deliver(MessageDelivery message, ISourceController<?> source);
     
     public boolean hasSelector();
     
-    public boolean match(MessageDelivery message);
-    
-    public boolean isDurable();
+    public boolean matches(MessageDelivery message);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java Thu May 21 02:34:23 2009
@@ -19,6 +19,9 @@
 import java.util.Collection;
 
 import org.apache.activemq.broker.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface Destination {
@@ -26,7 +29,8 @@
     AsciiBuffer getDomain();
     AsciiBuffer getName();
     Collection<Destination> getDestinations();
-    
+    public ActiveMQDestination asActiveMQDestination();
+
     public class SingleDestination implements Destination {
 
         private AsciiBuffer domain;
@@ -67,6 +71,18 @@
         private void setDomain(String domain) {
             setDomain(new AsciiBuffer(domain));
         }
+
+        public ActiveMQDestination asActiveMQDestination() {
+            if(domain.equals(Router.TOPIC_DOMAIN))
+            {
+                return new ActiveMQTopic(name.toString());
+            }
+            else if(domain.equals(Router.QUEUE_DOMAIN))
+            {
+                return new ActiveMQQueue(name.toString());
+            }
+            return null;
+        }
     }
     
     public class MultiDestination implements Destination {
@@ -95,7 +111,12 @@
         public AsciiBuffer getName() {
             return null;
         }
+        
+        public ActiveMQDestination asActiveMQDestination() {
+            throw new UnsupportedOperationException("Not yet implemented");
+        }
 
     }
     
+    
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Thu May 21 02:34:23 2009
@@ -34,8 +34,10 @@
     
     public Object remove(AsciiBuffer destinationName);
 
-    public void bind(AsciiBuffer destinationName, DeliveryTarget deliveryTarget);
-
+    public void bind(AsciiBuffer destinationName, DeliveryTarget target);
+    
+    public void unbind(AsciiBuffer destinationName, DeliveryTarget target);
+    
     public Collection<DeliveryTarget> route(AsciiBuffer destinationName, MessageDelivery message);
     
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DurableSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.Subscription;
+
+public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
+
+    private final ExclusivePersistentQueue<Long, MessageDelivery> queue;
+    private final VirtualHost host;
+    private final Destination destination;
+    private Subscription<MessageDelivery> connectedSub;
+    boolean started = false;
+    BooleanExpression selector;
+
+    DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, ExclusivePersistentQueue<Long, MessageDelivery> queue) {
+        this.host = host;
+        this.queue = queue;
+        this.destination = destination;
+        this.selector = selector;
+        this.host.getRouter().bind(destination, this);
+    }
+    
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+     */
+    public void deliver(MessageDelivery message, ISourceController<?> source) {
+        queue.add(message, source);
+    }
+
+    public synchronized void connect(final Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+        if (this.connectedSub == null) {
+            this.connectedSub = subscription;
+            queue.addSubscription(connectedSub);
+        } else if (connectedSub != subscription) {
+            throw new UserAlreadyConnectedException();
+        }
+    }
+
+    public synchronized void disconnect(final Subscription<MessageDelivery> subscription) {
+        if (connectedSub != null && connectedSub == subscription) {
+            queue.removeSubscription(connectedSub);
+            connectedSub = null;
+        }
+    }
+
+    public boolean matches(MessageDelivery message) {
+        if (selector == null) {
+            return true;
+        }
+
+        Message msg = message.asType(Message.class);
+        if (msg == null) {
+            return false;
+        }
+
+        MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+        selectorContext.setMessageReference(msg);
+        selectorContext.setDestination(destination.asActiveMQDestination());
+        try {
+            return (selector.matches(selectorContext));
+        } catch (JMSException e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    public IFlowSink<MessageDelivery> getSink() {
+        return queue;
+    }
+
+    public boolean hasSelector() {
+        return selector != null;
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Thu May 21 02:34:23 2009
@@ -20,7 +20,7 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 
 public interface MessageDelivery {
@@ -108,7 +108,7 @@
      * @param queue
      *            The queue for which to acknowledge the message.
      */
-    public void acknowledge(QueueStore.QueueDescriptor queue);
+    public void acknowledge(QueueDescriptor queue);
 
     /**
      * Gets the tracking number used to identify this message in the message

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDeliveryWrapper.java Thu May 21 02:34:23 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.IOException;
-
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Thu May 21 02:34:23 2009
@@ -16,52 +16,37 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.IOException;
-import java.util.HashMap;
-
 import org.apache.activemq.broker.DeliveryTarget;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 
 public class Queue implements DeliveryTarget {
 
-    HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
     private Destination destination;
-    private IQueue<Long, MessageDelivery> queue;
+    private final IQueue<Long, MessageDelivery> queue;
     private VirtualHost virtualHost;
 
     Queue(IQueue<Long, MessageDelivery> queue) {
         this.queue = queue;
     }
+    
 
-    public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
-        queue.add(delivery, source);
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+     */
+    public void deliver(MessageDelivery message, ISourceController<?> source) {
+        queue.add(message, source);
     }
-
-    public final void addConsumer(final DeliveryTarget dt) {
-        Subscription<MessageDelivery> sub = new QueueSubscription(dt);
-
-        Subscription<MessageDelivery> old = subs.put(dt, sub);
-        if (old == null) {
-            queue.addSubscription(sub);
-        } else {
-            subs.put(dt, old);
-        }
+    
+    public final void addSubscription(final Subscription<MessageDelivery> sub) {
+        queue.addSubscription(sub);
     }
 
-    public boolean removeSubscirption(final DeliveryTarget dt) {
-        Subscription<MessageDelivery> sub = subs.remove(dt);
-        if (sub != null) {
-            return queue.removeSubscription(sub);
-        }
-        return false;
+    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+        return queue.removeSubscription(sub);
     }
 
     public void start() throws Exception {
@@ -74,15 +59,11 @@
         }
     }
 
-    public IFlowSink<MessageDelivery> getSink() {
-        return queue;
-    }
-
     public boolean hasSelector() {
         return false;
     }
 
-    public boolean match(MessageDelivery message) {
+    public boolean matches(MessageDelivery message) {
         return true;
     }
 
@@ -106,62 +87,28 @@
         return true;
     }
 
-    public static class QueueSubscription implements Subscription<MessageDelivery> {
-        final DeliveryTarget target;
-
-        public QueueSubscription(DeliveryTarget dt) {
-            this.target = dt;
-        }
-
-        public boolean matches(MessageDelivery message) {
-            return target.match(message);
-        }
-
-        public boolean hasSelector() {
-            return target.hasSelector();
-        }
-
-        public boolean isRemoveOnDispatch(MessageDelivery delivery) {
-            return !delivery.isPersistent();
-        }
-
-        public IFlowSink<MessageDelivery> getSink() {
-            return target.getSink();
-        }
-
-        @Override
-        public String toString() {
-            return target.getSink().toString();
-        }
-
-        public boolean offer(MessageDelivery elem, ISourceController<MessageDelivery> controller, SubscriptionDeliveryCallback callback) {
-            return target.getSink().offer(new QueueDelivery(elem, callback), controller);
-        }
-
-        public boolean isBrowser() {
-            return false;
+    public static class QueueSubscription implements BrokerSubscription {
+        Subscription<MessageDelivery> subscription;
+        final Queue queue;
+
+        public QueueSubscription(Queue queue) {
+            this.queue = queue;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+         */
+        public void connect(Subscription<MessageDelivery> subscription) throws UserAlreadyConnectedException {
+            this.subscription = subscription;
+            queue.addSubscription(subscription);
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+         */
+        public void disconnect(Subscription<MessageDelivery> context) {
+            queue.removeSubscription(subscription);
         }
     }
 
-    private static class QueueDelivery extends MessageDeliveryWrapper {
-        private final SubscriptionDeliveryCallback callback;
-
-        QueueDelivery(MessageDelivery delivery, SubscriptionDeliveryCallback callback) {
-            super(delivery);
-            this.callback = callback;
-        }
-
-        @Override
-        public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
-            // We override this for queue deliveries as the sub needn't
-            // persist the message
-        }
-
-        public void acknowledge(QueueStore.QueueDescriptor queue) {
-            if (callback != null) {
-                callback.acknowledge();
-            }
-        }
-
-    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java Thu May 21 02:34:23 2009
@@ -27,7 +27,6 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public class QueueDomain implements Domain {
-    
     final HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
 
     public void add(AsciiBuffer name, Object queue) {
@@ -37,8 +36,12 @@
         return queues.remove(name);
     }
 
-    public void bind(AsciiBuffer name, DeliveryTarget deliveryTarget) {
-        queues.get(name).addConsumer(deliveryTarget);
+    public void bind(AsciiBuffer name, DeliveryTarget subscription) {
+        
+    }
+    
+    public void unbind(AsciiBuffer name, DeliveryTarget subscription) {
+        
     }
 
     public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Thu May 21 02:34:23 2009
@@ -57,36 +57,18 @@
         return domains.remove(name);
     }
 
-    public synchronized void bind(Destination destination, DeliveryTarget dt) {
+    public synchronized void bind(Destination destination, DeliveryTarget target) {
         Domain domain = domains.get(destination.getDomain());
-        domain.bind(destination.getName(), dt);
+        domain.bind(destination.getName(), target);
+    }
+    
+    public synchronized void unbind(Destination destination, DeliveryTarget target) {
+        Domain domain = domains.get(destination.getDomain());
+        domain.unbind(destination.getName(), target);
     }
 
     public void route(final BrokerMessageDelivery msg, ISourceController<?> controller) {
 
-        // final Buffer transactionId = msg.getTransactionId();
-        // if( msg.isPersistent() ) {
-        // VoidCallback<RuntimeException> tx = new
-        // VoidCallback<RuntimeException>() {
-        // @Override
-        // public void run(Session session) throws RuntimeException {
-        // Long messageKey = session.messageAdd(msg.createMessageRecord());
-        // if( transactionId!=null ) {
-        // session.transactionAddMessage(transactionId, messageKey);
-        // }
-        // }
-        // };
-        // Runnable onFlush = new Runnable() {
-        // public void run() {
-        // if( msg.isResponseRequired() ) {
-        // // Let the client know the broker got the message.
-        // msg.onMessagePersisted();
-        // }
-        // }
-        // };
-        // virtualHost.getStore().execute(tx, onFlush);
-        // }
-        //        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
 
         //Set up the delivery for persistence:
@@ -100,8 +82,8 @@
             if (targets != null) {
                 // The sinks will request persistence via MessageDelivery.persist()
                 // if they require persistence:
-                for (DeliveryTarget dt : targets) {
-                    dt.deliver(msg, controller);
+                for (DeliveryTarget target : targets) {
+                    target.deliver(msg, controller);
                 }
             }
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java Thu May 21 02:34:23 2009
@@ -26,11 +26,12 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public class TopicDomain implements Domain {
-    
+
     final HashMap<AsciiBuffer, ArrayList<DeliveryTarget>> topicsTargets = new HashMap<AsciiBuffer, ArrayList<DeliveryTarget>>();
 
     public void add(AsciiBuffer name, Object queue) {
     }
+
     public Object remove(AsciiBuffer name) {
         return null;
     }
@@ -44,7 +45,17 @@
         targets.add(target);
     }
 
-    public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+    public void unbind(AsciiBuffer name, DeliveryTarget target) {
+        ArrayList<DeliveryTarget> targets = topicsTargets.get(name);
+        if (targets != null) {
+            targets.remove(target);
+            if (targets.isEmpty()) {
+                topicsTargets.remove(topicsTargets);
+            }
+        }
+    }
+
+    public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery message) {
         return topicsTargets.get(name);
     }
 

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java?rev=776934&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicSubscription.java Thu May 21 02:34:23 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.Subscription;
+
+public class TopicSubscription implements BrokerSubscription, DeliveryTarget {
+
+    protected final BooleanExpression selector;
+    protected final Destination destination;
+    protected Subscription<MessageDelivery> connectedSub;
+    private final VirtualHost host;
+
+    TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+       this.host = host;
+       this.selector = selector;
+       this.destination = destination;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+     */
+    public final void deliver(MessageDelivery message, ISourceController<?> source) {
+        connectedSub.add(message, source, null);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+     */
+    public boolean hasSelector() {
+        return selector != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
+     * .broker.protocol.ProtocolHandler.ConsumerContext)
+     */
+    public synchronized void connect(Subscription<MessageDelivery> context) throws UserAlreadyConnectedException {
+        host.getRouter().bind(destination, this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
+     * .broker.protocol.ProtocolHandler.ConsumerContext)
+     */
+    public synchronized void disconnect(Subscription<MessageDelivery> context) {
+        host.getRouter().unbind(destination, this);
+    }
+
+    public boolean matches(MessageDelivery message) {
+        if (selector == null) {
+            return true;
+        }
+
+        Message msg = message.asType(Message.class);
+        if (msg == null) {
+            return false;
+        }
+
+        MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+        selectorContext.setMessageReference(msg);
+        selectorContext.setDestination(destination.asActiveMQDestination());
+        try {
+            return (selector.matches(selectorContext));
+        } catch (JMSException e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Thu May 21 02:34:23 2009
@@ -20,7 +20,9 @@
 import java.util.HashMap;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.IQueue;
 
 /**
@@ -31,6 +33,7 @@
     final private BrokerQueueStore queueStore;
     final private MessageBroker broker;
     final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
+    final private HashMap<String, DurableSubscription> durableSubs = new HashMap<String, DurableSubscription>();
     private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
     private Router router;
     private boolean started;
@@ -98,12 +101,11 @@
     }
 
     public synchronized Queue createQueue(Destination dest) throws Exception {
-        if(!started)
-        {
+        if (!started) {
             //Queues from the store must be loaded before we can create new ones:
             throw new IllegalStateException("Can't create queue on unstarted host");
         }
-        
+
         Queue queue = queues.get(dest);
         // If the queue doesn't exist create it:
         if (queue == null) {
@@ -121,4 +123,28 @@
     public BrokerQueueStore getQueueStore() {
         return queueStore;
     }
+
+    public BrokerSubscription createSubscription(ConsumerContext consumer) {
+        Destination destination = consumer.getDestination();
+        BrokerSubscription sub = null;
+        if(destination.getDomain().equals(Router.TOPIC_DOMAIN))
+        {
+            if (consumer.isDurable()) {
+                sub = durableSubs.get(consumer.getSubscriptionName());
+                if (sub == null) {
+                    ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+                    queue.start();
+                    DurableSubscription dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
+                    durableSubs.put(consumer.getSubscriptionName(), dsub);
+                    sub = dsub;
+                }
+            } else if (consumer.getDestination().getDomain().equals(Router.TOPIC_DOMAIN)) {
+                sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
+            }
+        } else {
+            Queue queue = queues.get(destination.getName());
+            sub = new Queue.QueueSubscription(queue);
+        }
+        return sub;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu May 21 02:34:23 2009
@@ -20,7 +20,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -28,11 +27,12 @@
 import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.BrokerConnection;
 import org.apache.activemq.broker.BrokerMessageDelivery;
-import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.BrokerSubscription;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.broker.VirtualHost;
+import org.apache.activemq.broker.BrokerSubscription.UserAlreadyConnectedException;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
 import org.apache.activemq.broker.store.Store.MessageRecord;
@@ -76,7 +76,6 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowDrain;
 import org.apache.activemq.flow.IFlowLimiter;
 import org.apache.activemq.flow.IFlowResource;
 import org.apache.activemq.flow.IFlowSink;
@@ -86,10 +85,6 @@
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.transport.WireFormatNegotiator;
@@ -409,158 +404,109 @@
         }
     }
 
-    class ConsumerContext implements DeliveryTarget {
+    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
 
         private final ConsumerInfo info;
         private String name;
         private BooleanExpression selector;
         private boolean isDurable;
         private boolean isQueueReceiver;
-        private QueueStore.QueueDescriptor durableQueueId;
 
-        private SingleFlowRelay<MessageDelivery> queue;
-        public WindowLimiter<MessageDelivery> limiter;
-        private AtomicLong deliverySequence = new AtomicLong(0);
+        private final FlowController<MessageDelivery> controller;
+        private final WindowLimiter<MessageDelivery> limiter;
 
-        HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
+        HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
         LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
 
-        public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
+        public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException, UserAlreadyConnectedException {
             this.info = info;
             this.name = info.getConsumerId().toString();
 
-            isDurable = info.isDurable();
-            if (isDurable) {
-                durableQueueId = new QueueStore.QueueDescriptor();
-                durableQueueId.setQueueName(new AsciiBuffer(info.getSubscriptionName()));
-                try {
-                    connection.getBroker().getDefaultVirtualHost().getQueueStore().addQueue(durableQueueId);
-                } catch (Throwable thrown) {
-                    thrown.printStackTrace();
-                }
-            }
-
-            selector = parseSelector(info);
-
             Flow flow = new Flow("broker-" + name + "-outbound", false);
+            if (info.isDurable())
+
+                selector = parseSelector(info);
             limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
                 public int getElementSize(MessageDelivery m) {
                     return m.getFlowLimiterSize();
                 }
             };
-            queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
-            queue.setDrain(new IFlowDrain<MessageDelivery>() {
-                public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
-                    Message msg = message.asType(Message.class);
-                    MessageDispatch md = new MessageDispatch();
-                    md.setConsumerId(info.getConsumerId());
-                    md.setMessage(msg);
-                    md.setDestination(msg.getDestination());
-                    // Add to the pending list if persistent and we are durable:
-                    if (message.isPersistent() && (isDurable() || isQueueReceiver())) {
-                        synchronized (queue) {
-                            Object old = pendingMessages.put(msg.getMessageId(), message);
-                            if (old != null) {
-                                new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
-                            }
-                            pendingMessageIds.add(msg.getMessageId());
-                            connection.write(md);
-                        }
-                    } else {
-                        if (isQueueReceiver()) {
-                            message.acknowledge(durableQueueId);
-                        }
-                        connection.write(md);
-                    }
-                };
-            });
 
-            // Subscribe
-            if (info.getDestination().isQueue()) {
-                isQueueReceiver = true;
+            controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
+            controller.useOverFlowQueue(false);
+            controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities() - 1));
+            super.onFlowOpened(controller);
+            
+            BrokerSubscription sub = host.createSubscription(this);
+            sub.connect(this);
+        }
+
+        public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+            if (!controller.offer(message, source)) {
+                return false;
+            } else {
+                sendInternal(message, controller, callback);
+                return true;
+            }
+        }
+
+        public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+            controller.add(message, source);
+            sendInternal(message, controller, callback);
+        }
+
+        private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+            Message msg = message.asType(Message.class);
+            MessageDispatch md = new MessageDispatch();
+            md.setConsumerId(info.getConsumerId());
+            md.setMessage(msg);
+            md.setDestination(msg.getDestination());
+            // Add to the pending list if persistent and we are durable:
+            if (callback != null) {
+                synchronized (this) {
+                    Object old = pendingMessages.put(msg.getMessageId(), callback);
+                    if (old != null) {
+                        new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
+                    }
+                    pendingMessageIds.add(msg.getMessageId());
+                    connection.write(md);
+                }
+            } else {
+                connection.write(md);
             }
-            router.bind(convert(info.getDestination()), this);
         }
 
         public void ack(MessageAck info) {
             // TODO: The pending message queue could probably be optimized to
             // avoid having to create a new list here.
-            LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
-            synchronized (queue) {
-                if (isDurable() || isQueueReceiver()) {
-                    MessageId id = info.getLastMessageId();
+            LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
+            synchronized (this) {
+                MessageId id = info.getLastMessageId();
+                if (isDurable() || isQueueReceiver())
                     while (!pendingMessageIds.isEmpty()) {
                         MessageId pendingId = pendingMessageIds.getFirst();
-                        MessageDelivery delivery = pendingMessages.remove(pendingId);
-                        acked.add(delivery);
+                        SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+                        acked.add(callback);
                         pendingMessageIds.removeFirst();
                         if (pendingId.equals(id)) {
                             break;
                         }
                     }
-
-                }
                 limiter.onProtocolCredit(info.getMessageCount());
             }
 
             // Delete outside of synchronization on queue to avoid contention
             // with enqueueing threads.
-            for (MessageDelivery delivery : acked) {
-                delivery.acknowledge(durableQueueId);
+            for (SubscriptionDeliveryCallback callback : acked) {
+                callback.acknowledge();
             }
         }
 
-        public IFlowSink<MessageDelivery> getSink() {
-            return queue;
-        }
-
-        public final void deliver(final MessageDelivery delivery, ISourceController<?> source) {
-            if (!match(delivery)) {
-                return;
-            }
-
-            if (isDurable() && delivery.isPersistent()) {
-                try {
-
-                    final long sequence = deliverySequence.incrementAndGet();
-                    //TODO saveable queue element here is temporary: We should replace this 
-                    //with an actual queue implementation:
-                    delivery.persist(new SaveableQueueElement<MessageDelivery>() {
-
-                        public MessageDelivery getElement() {
-                            return delivery;
-                        }
-
-                        public QueueDescriptor getQueueDescriptor() {
-                            return durableQueueId;
-                        }
-
-                        public long getSequenceNumber() {
-                            return sequence;
-                        }
-
-                        public void notifySave() {
-                            //noop
-                        }
-                        public boolean requestSaveNotify() {
-                            return false;
-                        }
-
-                    }, null, true);
-                } catch (Exception e) {
-                    // TODO Auto-generated catch restoreBlock
-                    e.printStackTrace();
-                }
-            }
-
-            queue.add(delivery, source);
-        }
-
         public boolean hasSelector() {
             return selector != null;
         }
 
-        public boolean match(MessageDelivery message) {
+        public boolean matches(MessageDelivery message) {
             Message msg = message.asType(Message.class);
             if (msg == null) {
                 return false;
@@ -578,15 +524,137 @@
         }
 
         public boolean isDurable() {
-            return isDurable;
+            return info.isDurable();
         }
 
         public boolean isQueueReceiver() {
             return isQueueReceiver;
         }
 
-        public AsciiBuffer getPersistentQueueName() {
-            return null;
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#isBrowser()
+         */
+        public boolean isBrowser() {
+            return info.isBrowser();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+         * .Object)
+         */
+        public boolean isRemoveOnDispatch(MessageDelivery elem) {
+            return !elem.isPersistent() || !(isDurable || isQueueReceiver);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getDestination()
+         */
+        public Destination getDestination() {
+            return convert(info.getDestination());
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getJMSSelector()
+         */
+        public String getSelectorString() {
+            return info.getSelector();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getSubscriptionName()
+         */
+        public String getSubscriptionName() {
+            return info.getSubscriptionName();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getFullSelector()
+         */
+        public BooleanExpression getSelectorExpression() {
+            return selector;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getJMSSelector()
+         */
+        public String getSelector() {
+            return info.getSelector();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getConnection()
+         */
+        public BrokerConnection getConnection() {
+            return connection;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext
+         * #getConsumerId()
+         */
+        public String getConsumerId() {
+            return name;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#getSink()
+         */
+        public IFlowSink<MessageDelivery> getSink() {
+            return this;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController)
+         */
+        public void add(MessageDelivery message, ISourceController<?> source) {
+            add(message, source, null);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController)
+         */
+        public boolean offer(MessageDelivery message, ISourceController<?> source) {
+            return offer(message, source, null);
         }
 
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Thu May 21 02:34:23 2009
@@ -21,16 +21,40 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerConnection;
 import org.apache.activemq.broker.BrokerMessageDelivery;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.wireformat.WireFormat;
 
 public interface ProtocolHandler extends Service {
-    
+
     public void setConnection(BrokerConnection connection);
+
+    public BrokerConnection getConnection();
+
     public void onCommand(Object command);
+
     public void onException(Exception error);
+
     public void setWireFormat(WireFormat wf);
-    
+
     public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
 
+    public interface ConsumerContext extends Subscription<MessageDelivery>, IFlowSink<MessageDelivery>{
+        public String getConsumerId();
+        
+        public Destination getDestination();
+
+        public String getSelector();
+        
+        public BooleanExpression getSelectorExpression();
+        
+        public boolean isDurable();
+        
+        public String getSubscriptionName();
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu May 21 02:34:23 2009
@@ -25,14 +25,13 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 
 import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.BrokerConnection;
 import org.apache.activemq.broker.BrokerMessageDelivery;
-import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.BrokerSubscription;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
@@ -51,10 +50,7 @@
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
-import org.apache.activemq.queue.QueueStore.SaveableQueueElement;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
@@ -118,7 +114,6 @@
             public void onStompFrame(StompFrame frame) throws Exception {
                 ConsumerContext ctx = new ConsumerContext(frame);
                 consumers.put(ctx.stompDestination, ctx);
-                router.bind(ctx.destination, ctx);
                 ack(frame);
             }
         });
@@ -266,10 +261,11 @@
         }
     }
 
-    class ConsumerContext implements DeliveryTarget {
+    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements ProtocolHandler.ConsumerContext {
 
         private BooleanExpression selector;
-
+        private String selectorString;
+        
         private SingleFlowRelay<MessageDelivery> queue;
         public WindowLimiter<MessageDelivery> limiter;
         private FrameTranslator translator;
@@ -278,11 +274,9 @@
         private Destination destination;
         private String ackMode;
 
-        private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
+        private LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback>();
 
         private boolean durable;
-        private QueueStore.QueueDescriptor durableQueueId;
-        private AtomicLong deliverySequence = new AtomicLong(0);
 
         public ConsumerContext(final StompFrame subscribe) throws Exception {
             translator = translator(subscribe);
@@ -291,7 +285,7 @@
             stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
             destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
             subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
-
+            
             ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
             if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
                 ackMode = StompSubscription.CLIENT_ACK;
@@ -304,33 +298,25 @@
                 ackMode = StompSubscription.AUTO_ACK;
             }
 
-            selector = parseSelector(subscribe);
+            selectorString = subscribe.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
+            selector = parseSelector(selectorString);
 
             if (ackMode != StompSubscription.AUTO_ACK) {
                 Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
-                limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
+                limiter = new WindowLimiter<MessageDelivery>(true, flow, connection.getOutputWindowSize(), connection.getOutputResumeThreshold()) {
                     public int getElementSize(MessageDelivery m) {
                         return m.getFlowLimiterSize();
                     }
                 };
-                queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
-                queue.setDrain(new IFlowDrain<MessageDelivery>() {
-                    public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
-                        StompFrame frame = message.asType(StompFrame.class);
-                        if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
-                            synchronized (allSentMessageIds) {
-                                AsciiBuffer msgId = message.getMsgId();
-                                sentMessageIds.put(msgId, msgId);
-                                allSentMessageIds.put(msgId, ConsumerContext.this);
-                            }
-                        }
-                        connection.write(frame);
-                    };
-                });
+                
+              //FIXME need to keep track of actual size:
+              //And Create a flow controller:
             } else {
                 queue = outboundQueue;
             }
-
+            
+            BrokerSubscription sub = router.getVirtualHost().createSubscription(this);
+            sub.connect(this);
         }
 
         public void ack(StompFrame info) throws Exception {
@@ -342,6 +328,7 @@
                         AsciiBuffer next = iterator.next();
                         iterator.remove();
                         allSentMessageIds.remove(next);
+                        //FIXME need to keep track of actual size:
                         credits++;
                         if (next.equals(mid)) {
                             break;
@@ -369,7 +356,7 @@
             return false;
         }
 
-        public boolean match(MessageDelivery message) {
+        public boolean matches(MessageDelivery message) {
             StompFrame stompMessage = message.asType(StompFrame.class);
             if (stompMessage == null) {
                 return false;
@@ -396,60 +383,109 @@
             // return false;
             // }
         }
-
-        public void deliver(final MessageDelivery delivery, ISourceController<?> source) {
-            if (!match(delivery)) {
-                return;
+        
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#send(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+         */
+        public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+            addInternal(message, controller, callback);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
+         */
+        public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+            //FIXME need a controller:
+            return false;
+        }
+        
+        private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback)
+        {
+            StompFrame frame = message.asType(StompFrame.class);
+            if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
+                synchronized (allSentMessageIds) {
+                    AsciiBuffer msgId = message.getMsgId();
+                    sentMessageIds.put(msgId, callback);
+                    allSentMessageIds.put(msgId, ConsumerContext.this);
+                }
             }
+            connection.write(frame);
+        }
+        
+        public boolean isDurable() {
+            return durable;
+        }
 
-            if (isDurable() && delivery.isPersistent()) {
-                try {
-                    final long sequence = deliverySequence.incrementAndGet();
-                    //TODO saveable queue element here is temporary: We should replace this 
-                    //with an actual queue implementation:
-                    delivery.persist(new SaveableQueueElement<MessageDelivery>() {
-
-                        public MessageDelivery getElement() {
-                            // TODO Auto-generated method stub
-                            return delivery;
-                        }
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getConnection()
+         */
+        public BrokerConnection getConnection() {
+            return connection;
+        }
 
-                        public QueueDescriptor getQueueDescriptor() {
-                            // TODO Auto-generated method stub
-                            return durableQueueId;
-                        }
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getDestination()
+         */
+        public Destination getDestination() {
+            return destination;
+        }
 
-                        public long getSequenceNumber() {
-                            return sequence;
-                        }
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getFullSelector()
+         */
+        public BooleanExpression getSelectorExpression() {
+            return selector;
+        }
 
-                        public void notifySave() {
-                            //noop
-                        }
-                        public boolean requestSaveNotify() {
-                            // TODO Auto-generated method stub
-                            return false;
-                        }
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getSelector()
+         */
+        public String getSelector() {
+            return selectorString;
+        }
 
-                    }, null, true);
-                } catch (Exception e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-            }
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getSubscriptionName()
+         */
+        public String getSubscriptionName() {
+            return subscriptionId;
+        }
 
-            queue.add(delivery, source);
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#getConsumerId()
+         */
+        public String getConsumerId() {
+            return subscriptionId;
+        }
 
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.Subscription#isBrowser()
+         */
+        public boolean isBrowser() {
+            return false;
         }
 
-        public boolean isDurable() {
-            return durable;
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang.Object)
+         */
+        public boolean isRemoveOnDispatch(MessageDelivery elem) {
+            //TODO fix this.
+            return true;
         }
 
-        public AsciiBuffer getPersistentQueueName() {
-            return null;
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object, org.apache.activemq.flow.ISourceController)
+         */
+        public void add(MessageDelivery elem, ISourceController<?> source) {
+            add(elem, source, null);
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object, org.apache.activemq.flow.ISourceController)
+         */
+        public boolean offer(MessageDelivery elem, ISourceController<?> source) {
+            return offer(elem, source, null);
+        }
     }
 
     private void sendError(String message) {
@@ -513,9 +549,8 @@
         return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
     }
 
-    private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException {
+    private static BooleanExpression parseSelector(String selector) throws InvalidSelectorException {
         BooleanExpression rc = null;
-        String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
         if (selector != null) {
             rc = SelectorParser.parse(selector);
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu May 21 02:34:23 2009
@@ -47,7 +47,6 @@
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 import org.apache.activemq.queue.QueueStore.RestoreListener;
 import org.apache.activemq.queue.QueueStore.RestoredElement;
 import org.apache.activemq.queue.QueueStore.SaveableQueueElement;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java Thu May 21 02:34:23 2009
@@ -26,9 +26,11 @@
     /**
 	 * Used by a FlowSource that is being dispatched to drain it's elements.
 	 * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
-	 * when the element has been dispatched to all downstream sinks if IFlowSource
-	 * @param elem
-	 * @param controller
+	 * when the element has been dispatched to all downstream sinks unless the 
+	 * IFlowSource#getAutoRelease() is set to true.
+	 * 
+	 * @param elem The element being drained
+	 * @param controller The source's controller
 	 */
     public void drain(E elem, ISourceController<E> controller);
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/CursoredQueue.java Thu May 21 02:34:23 2009
@@ -54,10 +54,8 @@
     private long nextSequenceNumber = 0;
     private int totalQueueCount;
 
-    // For now each queue element is assigned a restoreBlock number
-    // which is used for tracking page in requests. A trailing
-    // consumer will request messages from at most one restoreBlock
-    // at a time from the database.
+    //Dictates the chunk size of messages or place holders
+    //pulled in from the database:
     private static final int RESTORE_BLOCK_SIZE = 1000;
     private final PersistencePolicy<V> persistencePolicy;
     private final Mapper<Long, V> expirationMapper;
@@ -65,6 +63,7 @@
     private final QueueStore<?, V> queueStore;
     private final ElementLoader loader;
     public final QueueDescriptor queueDescriptor;
+    private final Object mutex;
 
     public CursoredQueue(PersistencePolicy<V> persistencePolicy, Mapper<Long, V> expirationMapper, Flow flow, QueueDescriptor queueDescriptor, QueueStore<?, V> store, Object mutex) {
         this.persistencePolicy = persistencePolicy;
@@ -100,8 +99,6 @@
         }
     }
 
-    private final Object mutex;
-
     protected abstract void requestDispatch();
 
     protected abstract void acknowledge(QueueElement<V> elem);
@@ -699,7 +696,8 @@
         }
 
         /**
-         * @param l
+         * @param l Set the highest sequence number to which this 
+         * cursor can advance.
          */
         public void setLimit(long l) {
             limit = l;
@@ -1346,18 +1344,20 @@
             // element loaded (until it is deleted)
             if (!persistencePolicy.isPagingEnabled()) {
                 qe.addHardRef();
-                // Persist the element if required:
-                if (persistencePolicy.isPersistent(qe.elem)) {
-                    // For now base decision on whether to delay flush on
-                    // whether or not there are consumers ready:
-                    // TODO should actually change this to active cursors:
-                    boolean delayable = !openCursors.isEmpty();
-                    qe.save(source, delayable);
-                }
             }
+            
+            // Persist the element if required:
+            if (persistencePolicy.isPersistent(qe.elem)) {
+                // For now base decision on whether to delay flush on
+                // whether or not there are consumers ready:
+                // TODO should actually change this to active cursors:
+                boolean delayable = !openCursors.isEmpty();
+                qe.save(source, delayable);
+            }
+            
             // Check with cursors to see if any of them have room for it
             // in memory:
-            else {
+            if(persistencePolicy.isPagingEnabled()) {
 
                 // Otherwise check with any other open cursor to see if
                 // it can hang on to the element:
@@ -1394,6 +1394,8 @@
                     qe.unload(source);
                 }
             }
+            
+            
 
         }
 
@@ -1552,6 +1554,7 @@
             synchronized (fromDatabase) {
                 fromDatabase.addAll(msgs);
             }
+            requestDispatch();
         }
 
         public String toString() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Thu May 21 02:34:23 2009
@@ -28,12 +28,13 @@
 import org.apache.activemq.queue.CursoredQueue.QueueElement;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 import org.apache.activemq.util.Mapper;
 
 public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements PersistentQueue<K, E> {
     private CursoredQueue<E> queue;
     private final FlowController<E> controller;
-    private IFlowSizeLimiter<E> limiter;
+    private final IFlowSizeLimiter<E> limiter;
     private boolean started = true;
     private Cursor<E> cursor;
     private final QueueDescriptor queueDescriptor;
@@ -41,9 +42,12 @@
     private QueueStore<K, E> queueStore;
     private Mapper<Long, E> expirationMapper;
     private boolean initialized;
+    private Subscription<E> subscription;
+    private ISourceController<E> sourceController;
+    protected boolean subBlocked = false;
 
     /**
-     * Creates a flow queue that can handle multiple flows.
+     * 
      * 
      * @param flow
      *            The {@link Flow}
@@ -55,12 +59,46 @@
     public ExclusivePersistentQueue(String name, IFlowSizeLimiter<E> limiter) {
         super(name);
         this.queueDescriptor = new QueueStore.QueueDescriptor();
+        this.limiter = limiter;
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.EXCLUSIVE);
 
         //TODO flow should be serialized as part of the subscription. 
         this.controller = new FlowController<E>(null, new Flow(name, false), limiter, this);
+        this.controller.useOverFlowQueue(false);
         super.onFlowOpened(controller);
+
+        sourceController = new ISourceController<E>() {
+
+            public void elementDispatched(E elem) {
+                // TODO Auto-generated method stub
+            }
+
+            public Flow getFlow() {
+                // TODO Auto-generated method stub
+                return controller.getFlow();
+            }
+
+            public IFlowResource getFlowResource() {
+                // TODO Auto-generated method stub
+                return ExclusivePersistentQueue.this;
+            }
+
+            public void onFlowBlock(ISinkController<?> sinkController) {
+                synchronized (ExclusivePersistentQueue.this) {
+                    subBlocked = true;
+                }
+            }
+
+            public void onFlowResume(ISinkController<?> sinkController) {
+                synchronized (ExclusivePersistentQueue.this) {
+                    subBlocked = false;
+                    if (isDispatchReady()) {
+                        notifyReady();
+                    }
+                }
+            }
+        };
     }
 
     /*
@@ -106,11 +144,7 @@
         };
 
         queue.initialize(sequenceMin, sequenceMax, count, size);
-        initialized = true;
-    }
-    
-    public void connect(Subscription<E> sub)
-    {
+
         //Open a cursor for the queue:
         FlowController<QueueElement<E>> memoryController = null;
         if (persistencePolicy.isPagingEnabled()) {
@@ -131,9 +165,32 @@
         }
 
         cursor = queue.openCursor(getResourceName(), memoryController, true, true);
+        cursor.reset(sequenceMin);
+        cursor.activate();
+
+        initialized = true;
+    }
+
+    public synchronized void addSubscription(Subscription<E> sub) {
+        if (subscription != null) {
+            if (subscription != sub) {
+                //TODO change this to something other than a runtime exception:
+                throw new IllegalStateException();
+            }
+            return;
+        }
+        this.subscription = sub;
+        subBlocked = false;
+        if (isDispatchReady()) {
+            notifyReady();
+        }
+    }
+
+    public synchronized void removeSubscription(Subscription<E> sub) {
+        if (sub == subscription) {
+            sub = null;
+        }
     }
-    
-    
 
     protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
         return controller;
@@ -167,7 +224,7 @@
 
     private final void accepted(ISourceController<?> source, E elem) {
         queue.add(source, elem);
-        if (started) {
+        if (isDispatchReady()) {
             notifyReady();
         }
     }
@@ -197,39 +254,72 @@
     }
 
     public final boolean isDispatchReady() {
-        return started && !cursor.isReady();
-    }
-
-    public final boolean pollingDispatch() {
-        E elem = poll();
+        if (started && subscription != null && !subBlocked && cursor.isReady()) {
+            return true;
+        }
 
-        if (elem != null) {
-            drain.drain(elem, controller);
+        if (queue.needsDispatch()) {
             return true;
-        } else {
-            return false;
         }
-    }
 
-    public final E poll() {
-        synchronized (this) {
-            if (!started) {
-                return null;
-            }
+        return false;
+    }
 
+    private QueueElement last = null;
+    public synchronized final boolean pollingDispatch() {
+        queue.dispatch();
+        if (started && subscription != null && !subBlocked) {
             QueueElement<E> qe = cursor.getNext();
-
-            // FIXME the release should really be done after dispatch.
-            // doing it here saves us from having to resynchronize
-            // after dispatch, but release limiter space too soon.
             if (qe != null) {
-                if (autoRelease) {
+                // If the sub doesn't remove on dispatch set an ack listener:
+                SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
+
+                if(qe.acquired || limiter.getSize() == 0 || (last != null && last.sequence >= qe.sequence))
+                {
+                    System.out.println("Offering" + qe + limiter.getSize());
+                }
+                
+                // See if the sink has room:
+                if (subscription.offer(qe.elem, sourceController, callback)) {
+                    if(limiter.getElementSize(qe.getElement()) > 1048)
+                    {
+                        System.out.println("Offering" + qe);
+                    }
+                    qe.setAcquired(true);
                     controller.elementDispatched(qe.getElement());
+                    last = qe;
+                    // If remove on dispatch acknowledge now:
+                    if (callback == null) {
+                        qe.acknowledge();
+                    }
                 }
-                return qe.getElement();
             }
-            return null;
         }
+
+        return isDispatchReady();
+    }
+
+    public E poll() {
+        throw new UnsupportedOperationException("poll not supported for exclusive queue");
+        //        
+        //        synchronized (this) {
+        //            if (!started) {
+        //                return null;
+        //            }
+        //
+        //            QueueElement<E> qe = cursor.getNext();
+        //
+        //            // FIXME the release should really be done after dispatch.
+        //            // doing it here saves us from having to resynchronize
+        //            // after dispatch, but release limiter space too soon.
+        //            if (qe != null) {
+        //                if (autoRelease) {
+        //                    controller.elementDispatched(qe.getElement());
+        //                }
+        //                return qe.getElement();
+        //            }
+        //            return null;
+        //        }
     }
 
     @Override
@@ -280,8 +370,7 @@
      * @return The size of the elements in this queue or -1 if not yet known.
      */
     public synchronized long getEnqueuedSize() {
-        if(!initialized)
-        {
+        if (!initialized) {
             return -1;
         }
         return limiter.getSize();
@@ -291,8 +380,7 @@
      * @return The count of the elements in this queue or -1 if not yet known.
      */
     public synchronized long getEnqueuedCount() {
-        if(!initialized)
-        {
+        if (!initialized) {
             return -1;
         }
         return queue.getEnqueuedCount();

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java?rev=776934&r1=776933&r2=776934&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java Thu May 21 02:34:23 2009
@@ -94,7 +94,26 @@
      *         {@link ISourceController#onFlowBlock(ISinkController)} prior to
      *         returning false.
      */
-    public boolean offer(E element, ISourceController<E> controller, SubscriptionDeliveryCallback callback);
+    public boolean offer(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+    
+    /**
+     * Pushes an item to the subscription. If the subscription is not remove on
+     * dispatch, then it must call acknowledge method on the callback when it
+     * has acknowledged the message.
+     * 
+     * @param element
+     *            The delivery container the offered element.
+     * @param controller
+     *            The queue's controller, which must be used if the added
+     *            element exceeds the subscription's buffer limits.
+     * @param callback
+     *            The {@link SubscriptionDeliveryCallback} associated with the element
+     * @return true if the element was accepted false otherwise, if false is
+     *         returned the caller must have called
+     *         {@link ISourceController#onFlowBlock(ISinkController)} prior to
+     *         returning false.
+     */
+    public void add(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
 
     @Deprecated
     public IFlowSink<E> getSink();