You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 06:02:39 UTC

svn commit: r786364 - in /activemq/sandbox/activemq-flow: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/test/java/org/apache/activemq/legacy/openwire...

Author: cmacnaug
Date: Fri Jun 19 04:02:38 2009
New Revision: 786364

URL: http://svn.apache.org/viewvc?rev=786364&view=rev
Log:
Adding support for exclusive consumers on shared queues. 

Also:
-Refactored SharedPriorityQueue to extend partitioned queue (eventually we'll
want partitioned queues to assist in exclusivity ... for now each partion is exlusive but it isn't enforced across partitions)
-Fixed legacy openwire exclusive consumer test so that it correctly tests exclusivity. 

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Jun 19 04:02:38 2009
@@ -53,8 +53,8 @@
 
     protected final boolean USE_KAHA_DB = true;
     protected final boolean PURGE_STORE = true;
-    protected final boolean PERSISTENT = true;
-    protected final boolean DURABLE = true;
+    protected final boolean PERSISTENT = false;
+    protected final boolean DURABLE = false;
 
     // Set to put senders and consumers on separate brokers.
     protected boolean multibroker = false;

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Fri Jun 19 04:02:38 2009
@@ -107,7 +107,7 @@
     public MessageRecord createMessageRecord() {
         MessageRecord record = new MessageRecord();
         record.setEncoding(ENCODING);
-        
+
         ByteSequence bytes;
         try {
             bytes = storeWireFormat.marshal(message);
@@ -130,7 +130,9 @@
         this.storeWireFormat = wireFormat;
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.activemq.broker.MessageDelivery#getTTE()
      */
     public long getExpiration() {
@@ -140,4 +142,8 @@
     public MessageEvaluationContext createMessageEvaluationContext() {
         return new OpenwireMessageEvaluationContext(message);
     }
+
+    public String toString() {
+        return message.getMessageId().toString();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 04:02:38 2009
@@ -571,6 +571,10 @@
             return isQueueReceiver;
         }
 
+        public boolean isExclusive() {
+            return info.isExclusive();
+        }
+        
         /*
          * (non-Javadoc)
          * 
@@ -704,6 +708,10 @@
             return true;
         }
 
+        public String toString() {
+            return info.getConsumerId().toString();
+        }
+
     }
 
     private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 04:02:38 2009
@@ -786,9 +786,7 @@
         connection2.request(consumerInfo2);
 
         // Second message should go to consumer 1 even though consumer 2 is
-        // ready
-        // for dispatch.
-        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        // ready for dispatch.
         connection1.send(createMessage(producerInfo, destination, deliveryMode));
 
         // Acknowledge the first 2 messages
@@ -803,6 +801,7 @@
 
         // The last two messages should now go the the second consumer.
         connection1.send(createMessage(producerInfo, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
 
         for (int i = 0; i < 2; i++) {
             Message m1 = receiveMessage(connection2);

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Jun 19 04:02:38 2009
@@ -445,6 +445,10 @@
             return name + " on " + sourceQueue.getResourceName();
         }
 
+        public boolean isExclusive() {
+            return false;
+        }
+
         /*
          * (non-Javadoc)
          * 

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Jun 19 04:02:38 2009
@@ -21,23 +21,24 @@
 import java.util.HashSet;
 
 import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.util.Mapper;
 
 abstract public class PartitionedQueue<K, V> extends AbstractFlowQueue<V> implements IPartitionedQueue<K, V> {
 
-    private HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
+    protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
     private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
-    protected Mapper<Integer, V> partitionMapper;
-    private QueueStore<K, V> store;
+    protected QueueStore<K, V> store;
     protected IDispatcher dispatcher;
-    private boolean started;
-    private boolean shutdown = false;
+    protected boolean started;
+    protected boolean shutdown = false;
     protected QueueDescriptor queueDescriptor;
-    private int basePriority = 0;
+    protected PersistencePolicy<V> persistencePolicy;
+    protected Mapper<Long, V> expirationMapper;
+    protected Mapper<K, V> keyMapper;
+    protected Mapper<Integer, V> partitionMapper;
+    protected int basePriority = 0;
 
     public PartitionedQueue(String name) {
         super(name);
@@ -50,7 +51,7 @@
         return queueDescriptor;
     }
 
-    public IQueue<K, V> getPartition(int partitionKey) {
+    protected IQueue<K, V> getPartition(int partitionKey) {
         boolean save = false;
         IQueue<K, V> rc = null;
         checkShutdown();
@@ -70,7 +71,10 @@
 
         return rc;
     }
-
+    
+    
+    abstract public IQueue<K, V> createPartition(int partitionKey);
+    
     /*
      * (non-Javadoc)
      * 
@@ -81,20 +85,20 @@
             if (basePriority != priority) {
                 basePriority = priority;
                 if (!shutdown) {
-                    for (IQueue<K, V> queue : partitions.values()) {
+                    for (IQueue<K, V> queue : getPartitions()) {
                         queue.setDispatchPriority(basePriority);
                     }
                 }
             }
         }
     }
-
+    
     public int getEnqueuedCount() {
         checkShutdown();
-        synchronized (partitions) {
+        synchronized (this) {
 
             int count = 0;
-            for (IQueue<K, V> queue : partitions.values()) {
+            for (IQueue<K, V> queue : getPartitions()) {
                 count += queue.getEnqueuedCount();
             }
             return count;
@@ -103,9 +107,9 @@
 
     public synchronized long getEnqueuedSize() {
         checkShutdown();
-        synchronized (partitions) {
+        synchronized (this) {
             long size = 0;
-            for (IQueue<K, V> queue : partitions.values()) {
+            for (IQueue<K, V> queue : getPartitions()) {
                 if (queue != null) {
                     size += queue.getEnqueuedSize();
                 }
@@ -114,43 +118,44 @@
         }
     }
 
+
     public void setStore(QueueStore<K, V> store) {
         this.store = store;
     }
 
     public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
-        // No-Op for now.
+        this.persistencePolicy = persistencePolicy;
     }
 
     public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
-        // No-Op for now.
-    }
-
-    abstract public IQueue<K, V> createPartition(int partitionKey);
-
-    public void addPartition(int partitionKey, IQueue<K, V> queue) {
-        checkShutdown();
-        synchronized (partitions) {
-            partitions.put(partitionKey, queue);
-            for (Subscription<V> sub : subscriptions) {
-                queue.addSubscription(sub);
-                queue.setDispatchPriority(basePriority);
-            }
-        }
+        this.expirationMapper = expirationMapper;
     }
 
     public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
+        checkShutdown();
         // No-op, only partitions should have stored values.
         if (count > 0 || size > 0) {
             throw new IllegalArgumentException("Partioned queues do not themselves hold values");
         }
+        if (expirationMapper == null) {
+            expirationMapper = new Mapper<Long, V>() {
+
+                public Long map(V element) {
+                    return -1L;
+                }
+            };
+        }
+        if (persistencePolicy == null) {
+            persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
+        }
     }
+    
 
     public synchronized void start() {
         if (!started) {
             checkShutdown();
             started = true;
-            for (IQueue<K, V> partition : partitions.values()) {
+            for (IQueue<K, V> partition : getPartitions()) {
                 if (partition != null)
                     partition.start();
             }
@@ -160,26 +165,27 @@
     public synchronized void stop() {
         if (started) {
             started = false;
-            for (IQueue<K, V> partition : partitions.values()) {
+            for (IQueue<K, V> partition : getPartitions()) {
                 if (partition != null)
                     partition.stop();
             }
         }
     }
 
+
     public void shutdown(boolean sync) {
-        HashMap<Integer, IQueue<K, V>> partitions = null;
+        Collection <IQueue<K, V>> partitions = null;
         synchronized (this) {
             if (!shutdown) {
                 shutdown = true;
                 started = false;
             }
-            partitions = this.partitions;
+            partitions = getPartitions();
             this.partitions = null;
         }
 
         if (partitions != null) {
-            for (IQueue<K, V> partition : partitions.values()) {
+            for (IQueue<K, V> partition : partitions) {
                 if (partition != null)
                     partition.shutdown(sync);
             }
@@ -188,9 +194,9 @@
 
     public void addSubscription(Subscription<V> sub) {
         checkShutdown();
-        synchronized (partitions) {
+        synchronized (this) {
             subscriptions.add(sub);
-            Collection<IQueue<K, V>> values = partitions.values();
+            Collection<IQueue<K, V>> values = getPartitions();
             for (IQueue<K, V> queue : values) {
                 queue.addSubscription(sub);
             }
@@ -199,9 +205,9 @@
 
     public boolean removeSubscription(Subscription<V> sub) {
         checkShutdown();
-        synchronized (partitions) {
+        synchronized (this) {
             if (subscriptions.remove(sub)) {
-                Collection<IQueue<K, V>> values = partitions.values();
+                Collection<IQueue<K, V>> values = getPartitions();
                 for (IQueue<K, V> queue : values) {
                     queue.removeSubscription(sub);
                 }
@@ -219,30 +225,42 @@
         return partitionMapper;
     }
 
+    
     public void add(V value, ISourceController<?> source) {
         int partitionKey = partitionMapper.map(value);
-        IQueue<K, V> partition = getPartition(partitionKey);
-        partition.add(value, source);
+        getPartition(partitionKey).add(value, source);
     }
 
     public boolean offer(V value, ISourceController<?> source) {
         int partitionKey = partitionMapper.map(value);
-        IQueue<K, V> partition = getPartition(partitionKey);
-        return partition.offer(value, source);
+        return getPartition(partitionKey).offer(value, source);
+    }
+    
+    public void setKeyMapper(Mapper<K, V> keyMapper) {
+        this.keyMapper = keyMapper;
+    }
+
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
     }
 
     public void setDispatcher(IDispatcher dispatcher) {
         checkShutdown();
         this.dispatcher = dispatcher;
-        synchronized (partitions) {
-            Collection<IQueue<K, V>> values = partitions.values();
+        synchronized (this) {
+            Collection<IQueue<K, V>> values = getPartitions();
             for (IQueue<K, V> queue : values) {
                 queue.setDispatcher(dispatcher);
             }
         }
     }
+    
+    protected Collection<IQueue<K, V>> getPartitions()
+    {
+        return partitions.values();
+    }
 
-    private void checkShutdown() {
+    protected void checkShutdown() {
         if (shutdown) {
             throw new IllegalStateException(this + " is shutdown");
         }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Fri Jun 19 04:02:38 2009
@@ -17,32 +17,13 @@
 package org.apache.activemq.queue;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.util.Mapper;
 
-public class SharedPriorityQueue<K, V> extends AbstractFlowQueue<V> implements IPartitionedQueue<K, V> {
+public class SharedPriorityQueue<K, V> extends PartitionedQueue<K, V> {
 
-    private final HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
-    private final Mapper<Integer, V> priorityMapper;
     private final ArrayList<SharedQueue<K, V>> partitions = new ArrayList<SharedQueue<K, V>>();
-    private Mapper<K, V> keyMapper;
-    private boolean autoRelease;
-    private IDispatcher dispatcher;
     private final PrioritySizeLimiter<V> limiter;
-    private QueueStore<K, V> store;
-    private PersistencePolicy<V> persistencePolicy;
-    private boolean started;
-    private QueueDescriptor queueDescriptor;
-    private Mapper<Long, V> expirationMapper;
-    private int basePriority = 0;
-    private boolean shutdown = false;
 
     public SharedPriorityQueue(String name, PrioritySizeLimiter<V> limiter) {
         super(name);
@@ -50,154 +31,36 @@
         queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
         queueDescriptor.setQueueType(QueueDescriptor.SHARED_PRIORITY);
         this.limiter = limiter;
-        priorityMapper = limiter.getPriorityMapper();
+        super.setPartitionMapper(limiter.getPriorityMapper());
         for (int i = 0; i < limiter.getPriorities(); i++) {
             partitions.add(null);
         }
     }
 
-    public synchronized void start() {
-        if (!started) {
-            checkShutdown();
-            started = true;
-            for (SharedQueue<K, V> partition : partitions) {
-                if (partition != null)
-                    partition.start();
-            }
-        }
-    }
-
-    public synchronized void stop() {
-        if (started) {
-            started = false;
-            for (SharedQueue<K, V> partition : partitions) {
-                if (partition != null)
-                    partition.stop();
-            }
-        }
-    }
-
+    @Override
     public void shutdown(boolean sync) {
-        ArrayList<SharedQueue<K, V>> partitions = null;
-        synchronized (this) {
-            if (!shutdown) {
-                shutdown = true;
-                started = false;
-            }
-            partitions = this.partitions;
-        }
-
-        if (partitions != null) {
-            for (IQueue<K, V> partition : partitions) {
-                if (partition != null)
-                    partition.shutdown(sync);
-            }
-        }
-    }
-
-    public void initialize(long sequenceMin, long sequenceMax, int count, long size) {
-        checkShutdown();
-        // No-op, only partitions should have stored values.
-        if (count > 0 || size > 0) {
-            throw new IllegalArgumentException("Partioned queues do not themselves hold values");
-        }
-        if (expirationMapper == null) {
-            expirationMapper = new Mapper<Long, V>() {
-
-                public Long map(V element) {
-                    return -1L;
-                }
-
-            };
-        }
-        if (persistencePolicy == null) {
-            persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
-        }
-    }
-
-    public synchronized int getEnqueuedCount() {
-        checkShutdown();
-        int count = 0;
-        for (SharedQueue<K, V> queue : partitions) {
-            if (queue != null) {
-                count += queue.getEnqueuedCount();
-            }
+        try {
+            super.shutdown(sync);
+        } finally {
+            partitions.clear();
         }
-        return count;
     }
 
+    /**
+     * Override with more efficient limiter lookup:
+     */
+    @Override
     public synchronized long getEnqueuedSize() {
         return limiter.getSize();
     }
 
-    public void setStore(QueueStore<K, V> store) {
-        this.store = store;
-    }
-
-    public void setPersistencePolicy(PersistencePolicy<V> persistencePolicy) {
-        this.persistencePolicy = persistencePolicy;
-    }
-
-    public void setExpirationMapper(Mapper<Long, V> expirationMapper) {
-        this.expirationMapper = expirationMapper;
-    }
-
-
-    @Override
-    public void addSubscription(Subscription<V> sub) {
-        synchronized (this) {
-            checkShutdown();
-            subscriptions.add(sub);
-            for (SharedQueue<K, V> queue : partitions) {
-                if (queue != null) {
-                    queue.addSubscription(sub);
-                }
-            }
-        }
-    }
-    
-    @Override
-    public boolean removeSubscription(Subscription<V> sub) {
-        synchronized (this) {
-            if (subscriptions.remove(sub)) {
-                for (SharedQueue<K, V> queue : partitions) {
-                    if (queue != null) {
-                        queue.removeSubscription(sub);
-                    }
-                }
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.queue.IQueue#setDispatchPriority(int)
-     */
-    public void setDispatchPriority(int priority) {
+    public IQueue<K, V> createPartition(int prio) {
         synchronized (this) {
-            if (basePriority != priority) {
-                basePriority = priority;
-                if (shutdown) {
-                    return;
-                }
-                for (int i = 0; i < limiter.getPriorities(); i++) {
-                    SharedQueue<K, V> queue = partitions.get(i);
-                    if (queue != null) {
-                        queue.setDispatchPriority(basePriority + i);
-                    }
-                }
-            }
+            return getPartition(prio, started);
         }
     }
 
-    public IQueue<K, V> createPartition(int prio) {
-        return getPartition(prio, false);
-    }
-
-    private IQueue<K, V> getPartition(int prio, boolean initialize) {
+    protected IQueue<K, V> getPartition(int prio, boolean initialize) {
         synchronized (this) {
             checkShutdown();
             SharedQueue<K, V> queue = partitions.get(prio);
@@ -231,71 +94,4 @@
             return queue;
         }
     }
-
-    public QueueDescriptor getDescriptor() {
-        return queueDescriptor;
-    }
-
-    public void add(V value, ISourceController<?> source) {
-        int prio = priorityMapper.map(value);
-        getPartition(prio, true).add(value, source);
-    }
-
-    public boolean offer(V value, ISourceController<?> source) {
-        int prio = priorityMapper.map(value);
-        return getPartition(prio, true).offer(value, source);
-    }
-
-    public void setKeyMapper(Mapper<K, V> keyMapper) {
-        this.keyMapper = keyMapper;
-    }
-
-    public void setAutoRelease(boolean autoRelease) {
-        this.autoRelease = autoRelease;
-    }
-
-    public void setDispatcher(IDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-        super.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
-    }
-
-    private void checkShutdown() {
-        if (shutdown) {
-            throw new IllegalStateException(this + " is shutdown");
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.activemq.queue.IPollableFlowSource#isDispatchReady()
-     */
-    public boolean isDispatchReady() {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException();
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.activemq.queue.IPollableFlowSource#poll()
-     */
-    public V poll() {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException();
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.activemq.queue.IPollableFlowSource#pollingDispatch()
-     */
-    public boolean pollingDispatch() {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException();
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted(org.apache.activemq.flow.ISourceController, java.lang.Object)
-     */
-    public void flowElemAccepted(ISourceController<V> source, V elem) {
-        // TODO Remove
-        throw new UnsupportedOperationException();
-        
-    }
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Fri Jun 19 04:02:38 2009
@@ -67,6 +67,9 @@
     private QueueStore<K, V> store;
     private PersistencePolicy<V> persistencePolicy;
 
+    private SubscriptionContext exclusiveConsumer = null;
+    private int exclusiveConsumerCount = 0;
+
     // Open consumers:
     private final HashMap<Subscription<V>, SubscriptionContext> consumers = new HashMap<Subscription<V>, SubscriptionContext>();
     private int startedConsumers = 0;
@@ -172,8 +175,15 @@
             SubscriptionContext context = new SubscriptionContext(subscription);
             SubscriptionContext old = consumers.put(subscription, context);
             if (old != null) {
+                context.close();
                 consumers.put(subscription, old);
             } else {
+                if (exclusiveConsumer == null) {
+                    if (context.isExclusive()) {
+                        exclusiveConsumer = context;
+                    }
+                }
+
                 context.start();
             }
         }
@@ -184,6 +194,29 @@
             SubscriptionContext old = consumers.remove(subscription);
             if (old != null) {
                 old.close();
+
+                //Was this the exclusive consumer?
+                if (old == exclusiveConsumer) {
+                    if (exclusiveConsumerCount > 0) {
+                        for (SubscriptionContext context : consumers.values()) {
+                            if (context.isExclusive()) {
+                                exclusiveConsumer = context;
+                                //Update the dispatch list:
+                                context.updateDispatchList();
+                                break;
+                            }
+                        }
+                    } else {
+                        //Otherwise add the remaining subs to appropriate dispatch
+                        //lists:
+                        exclusiveConsumer = null;
+                        for (SubscriptionContext context : consumers.values()) {
+                            if (!context.sub.isBrowser()) {
+                                context.updateDispatchList();
+                            }
+                        }
+                    }
+                }
                 return true;
             }
             return false;
@@ -397,6 +430,9 @@
                         SubscriptionContext nextConsumer = consumer.getNext();
                         switch (consumer.offer(next)) {
                         case ACCEPTED:
+                            if (DEBUG)
+                                System.out.println("Dispatched " + next.getElement() + " to " + consumer);
+
                             // Rotate list so this one is last next time:
                             sharedConsumers.rotate();
                             interested = true;
@@ -469,6 +505,9 @@
         SubscriptionContext(Subscription<V> target) {
             this.sub = target;
             this.cursor = openCursor(target.toString(), true, !sub.isBrowser());
+            if (isExclusive()) {
+                exclusiveConsumerCount++;
+            }
             cursor.setCursorReadyListener(new CursorReadyListener() {
                 public void onElementReady() {
                     if (!isLinked()) {
@@ -478,10 +517,15 @@
             });
         }
 
+        public boolean isExclusive() {
+            return sub.isExclusive() && !sub.isBrowser();
+        }
+
         public void start() {
             if (!isStarted) {
                 isStarted = true;
                 if (!sub.isBrowser()) {
+
                     if (sub.hasSelector()) {
                         activeSelectorSubs++;
                     }
@@ -504,6 +548,7 @@
             // If started remove this from any dispatch list
             if (isStarted) {
                 if (!sub.isBrowser()) {
+
                     if (sub.hasSelector()) {
                         activeSelectorSubs--;
                     }
@@ -517,6 +562,10 @@
         }
 
         public void close() {
+            if (isExclusive()) {
+                exclusiveConsumerCount--;
+            }
+
             stop();
         }
 
@@ -550,13 +599,22 @@
                 return false;
             }
 
-            // TODO Even if there are subscriptions with selectors present
-            // we can still join the shared cursor as long as there is at
-            // least one ready selector-less sub.
             boolean join = false;
-            if (activeSelectorSubs == 0) {
+            //If we are the exlusive consumer then we join the shared
+            //cursor:
+            if (exclusiveConsumer == this) {
+                join = true;
+            }
+            //Otherwise if we aren't we won't be joining anything!
+            else if (exclusiveConsumer != null) {
+                return false;
+            } else if (activeSelectorSubs == 0) {
                 join = true;
             } else {
+
+                // TODO Even if there are subscriptions with selectors present
+                // we can still join the shared cursor as long as there is at
+                // least one ready selector-less sub.
                 cursor.getNext();
                 if (queue.isEmpty() || cursor.compareTo(sharedCursor) >= 0) {
                     join = true;
@@ -578,9 +636,14 @@
         /**
          * Adds to subscription to the appropriate dispatch list:
          */
-        private final void updateDispatchList() {
+        final void updateDispatchList() {
 
             if (!checkJoinShared()) {
+                //Otherwise if we're not the exclusive consumer
+                if (!sub.isBrowser() && exclusiveConsumer != null) {
+                    return;
+                }
+
                 // Make sure our cursor is activated:
                 cursor.activate();
                 // If our next element is paged out
@@ -641,9 +704,7 @@
                     if (callback == null) {
                         qe.acknowledge();
                     }
-                }
-                else
-                {
+                } else {
                     qe.setAcquired(null);
                 }
 

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Fri Jun 19 04:02:38 2009
@@ -57,6 +57,15 @@
      * @return True if this is a subscription browser. 
      */
     public boolean isBrowser();
+    
+    /**
+     * Indicates that the subscription is exclusive. When there at least one 
+     * exclusive subscription on a shared queue, the queue will dispatch to
+     * only one such consumer while there is at least one connected.
+     * 
+     * @return True if the Subscription is exclusive.
+     */
+    public boolean isExclusive();
 
     /**
      * Returns true if the Subscription has a selector. If true

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Fri Jun 19 04:02:38 2009
@@ -36,7 +36,7 @@
     private final MockStoreAdapater store = new MockStoreAdapater();
     private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
     private static final boolean USE_OLD_QUEUE = false;
-    
+
     private IQueue<Long, Message> createQueue() {
 
         if (partitionMapper != null) {
@@ -117,6 +117,10 @@
                 return true;
             }
 
+            public boolean isExclusive() {
+                return false;
+            }
+
             public IFlowSink<Message> getSink() {
                 return dt.getSink();
             }
@@ -133,7 +137,7 @@
             public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
                 return getSink().offer(elem, controller);
             }
-            
+
             public void add(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
                 getSink().add(elem, controller);
             }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786364&r1=786363&r2=786364&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Fri Jun 19 04:02:38 2009
@@ -489,6 +489,13 @@
 		public boolean autoCreateDestination() {
 			return true;
 		}
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.Subscription#isExclusive()
+         */
+        public boolean isExclusive() {
+            return false;
+        }
     }
 
     private void sendError(String message) {