You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/26 19:01:08 UTC

svn commit: r1620659 [1/2] - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/jav...

Author: rgodfrey
Date: Tue Aug 26 17:01:07 2014
New Revision: 1620659

URL: http://svn.apache.org/r1620659
Log:
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Aug 26 17:01:07 2014
@@ -20,16 +20,24 @@
  */
 package org.apache.qpid.server.consumer;
 
-import org.apache.qpid.server.util.StateChangeListener;
-
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.server.util.StateChangeListener;
 
 public abstract class AbstractConsumerTarget implements ConsumerTarget
 {
 
     private final AtomicReference<State> _state;
-    private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener =
-            new AtomicReference<StateChangeListener<ConsumerTarget, State>>();
+
+    private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
+            CopyOnWriteArraySet<>();
+
+    private final Lock _stateChangeLock = new ReentrantLock();
+
 
     protected AbstractConsumerTarget(final State initialState)
     {
@@ -46,8 +54,7 @@ public abstract class AbstractConsumerTa
     {
         if(_state.compareAndSet(from, to))
         {
-            StateChangeListener<ConsumerTarget, State> listener = _stateListener.get();
-            if(listener != null)
+            for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
             {
                 listener.stateChanged(this, from, to);
             }
@@ -59,15 +66,38 @@ public abstract class AbstractConsumerTa
         }
     }
 
+    public final void notifyCurrentState()
+    {
+        for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+        {
+            State state = getState();
+            listener.stateChanged(this, state, state);
+        }
+    }
+    public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+    {
+        _stateChangeListeners.add(listener);
+    }
+
+    @Override
+    public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+    {
+        _stateChangeListeners.remove(listener);
+    }
+
+    public final boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
 
-    public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener)
+    public final void getSendLock()
     {
-        _stateListener.set(listener);
+        _stateChangeLock.lock();
     }
 
-    public final StateChangeListener<ConsumerTarget, State> getStateListener()
+    public final void releaseSendLock()
     {
-        return _stateListener.get();
+        _stateChangeLock.unlock();
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Aug 26 17:01:07 2014
@@ -31,6 +31,8 @@ public interface ConsumerTarget
 
     void acquisitionRemoved(MessageInstance node);
 
+    void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED
@@ -42,7 +44,7 @@ public interface ConsumerTarget
 
     void consumerRemoved(ConsumerImpl sub);
 
-    void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
+    void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
 
     long getUnacknowledgedBytes();
 
@@ -50,7 +52,7 @@ public interface ConsumerTarget
 
     AMQSessionModel getSessionModel();
 
-    long send(MessageInstance entry, boolean batch);
+    long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
     void flushBatched();
 
@@ -65,4 +67,11 @@ public interface ConsumerTarget
     boolean isSuspended();
 
     boolean close();
+
+    boolean trySendLock();
+
+    void getSendLock();
+
+    void releaseSendLock();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Aug 26 17:01:07 2014
@@ -79,6 +79,8 @@ public interface MessageInstance
 
     Filterable asFilterable();
 
+    ConsumerImpl getAcquiringConsumer();
+
     public static enum State
     {
         AVAILABLE,

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Tue Aug 26 17:01:07 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
 
@@ -52,4 +53,6 @@ public interface QueueConsumer<X extends
     MessageInstance.ConsumerAcquiredState<X> getOwningState();
 
     QueueContext getQueueContext();
+
+    ConsumerTarget getTarget();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Aug 26 17:01:07 2014
@@ -31,8 +31,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
 
@@ -65,7 +63,6 @@ class QueueConsumerImpl
     private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
-    private final Lock _stateChangeLock = new ReentrantLock();
     private final long _createTime = System.currentTimeMillis();
     private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
     private final boolean _acquires;
@@ -90,6 +87,8 @@ class QueueConsumerImpl
 
     private final ConsumerTarget _target;
     private final SubFlushRunner _runner = new SubFlushRunner(this);
+    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+            _listener;
     private volatile QueueContext _queueContext;
     private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
     {
@@ -134,17 +133,17 @@ class QueueConsumerImpl
 
         setupLogging();
 
-        _target.setStateListener(
-                new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
-                    {
-                        @Override
-                        public void stateChanged(final ConsumerTarget object,
-                                                 final ConsumerTarget.State oldState,
-                                                 final ConsumerTarget.State newState)
-                        {
-                            targetStateChanged(oldState, newState);
-                        }
-                    });
+        _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+        {
+            @Override
+            public void stateChanged(final ConsumerTarget object,
+                                     final ConsumerTarget.State oldState,
+                                     final ConsumerTarget.State newState)
+            {
+                targetStateChanged(oldState, newState);
+            }
+        };
+        _target.addStateListener(_listener);
     }
 
     private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet)
@@ -202,6 +201,7 @@ class QueueConsumerImpl
         }
     }
 
+    @Override
     public ConsumerTarget getTarget()
     {
         return _target;
@@ -248,17 +248,17 @@ class QueueConsumerImpl
     {
         if(_closed.compareAndSet(false,true))
         {
-            getSendLock();
+            _target.getSendLock();
             try
             {
-                _target.close();
                 _target.consumerRemoved(this);
+                _target.removeStateChangeListener(_listener);
                 _queue.unregisterConsumer(this);
                 deleted();
             }
             finally
             {
-                releaseSendLock();
+                _target.releaseSendLock();
             }
 
         }
@@ -420,17 +420,17 @@ class QueueConsumerImpl
 
     public final boolean trySendLock()
     {
-        return _stateChangeLock.tryLock();
+        return getTarget().trySendLock();
     }
 
     public final void getSendLock()
     {
-        _stateChangeLock.lock();
+        getTarget().getSendLock();
     }
 
     public final void releaseSendLock()
     {
-        _stateChangeLock.unlock();
+        getTarget().releaseSendLock();
     }
 
     public final long getCreateTime()
@@ -471,7 +471,7 @@ class QueueConsumerImpl
     public final void send(final QueueEntry entry, final boolean batch)
     {
         _deliveredCount.incrementAndGet();
-        long size = _target.send(entry, batch);
+        long size = _target.send(this, entry, batch);
         _deliveredBytes.addAndGet(size);
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Aug 26 17:01:07 2014
@@ -247,6 +247,26 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
+    public ConsumerImpl getAcquiringConsumer()
+    {
+        ConsumerImpl consumer;
+        EntryState state = _state;
+        if(state instanceof ConsumerAcquiredState)
+        {
+            consumer = ((ConsumerAcquiredState)state).getConsumer();
+        }
+        else if(state instanceof LockedAcquiredState)
+        {
+            consumer = ((LockedAcquiredState)state).getConsumer();
+        }
+        else
+        {
+            consumer = null;
+        }
+        return consumer;
+    }
+
+    @Override
     public boolean isAcquiredBy(ConsumerImpl consumer)
     {
         EntryState state = _state;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Tue Aug 26 17:01:07 2014
@@ -21,18 +21,18 @@
 package org.apache.qpid.server.queue;
 
 import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.security.auth.Subject;
+
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.TransportException;
 
-import javax.security.auth.Subject;
-
 /**
  * QueueRunners are Runnables used to process a queue when requiring
  * asynchronous message delivery to consumers, which is necessary
@@ -111,7 +111,7 @@ public class QueueRunner implements Runn
 
     public String toString()
     {
-        return "QueueRunner-" + _queue.getLogSubject();
+        return "QueueRunner-" + _queue.getLogSubject().toLogString();
     }
 
     public void execute()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Tue Aug 26 17:01:07 2014
@@ -21,18 +21,18 @@
 package org.apache.qpid.server.queue;
 
 
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.Subject;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.TransportException;
 
-import javax.security.auth.Subject;
-import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 
 class SubFlushRunner implements Runnable
 {

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Aug 26 17:01:07 2014
@@ -167,7 +167,7 @@ public class MockConsumer implements Con
     {
     }
 
-    public long send(MessageInstance entry, boolean batch)
+    public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
         long size = entry.getMessage().getSize();
         if (messages.contains(entry))
@@ -202,7 +202,7 @@ public class MockConsumer implements Con
     @Override
     public void consumerRemoved(final ConsumerImpl sub)
     {
-
+       close();
     }
 
     public void setState(State state)
@@ -216,11 +216,20 @@ public class MockConsumer implements Con
     }
 
     @Override
-    public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
+    public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
     {
         _listener = listener;
     }
 
+    @Override
+    public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+    {
+        if(_listener == listener)
+        {
+            _listener = null;
+        }
+    }
+
     public ArrayList<MessageInstance> getMessages()
     {
         return messages;
@@ -242,6 +251,23 @@ public class MockConsumer implements Con
         _isActive = isActive;
     }
 
+
+    public final boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
+    public final void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    public final void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+
     private static class MockSessionModel implements AMQSessionModel
     {
         private final UUID _id = UUID.randomUUID();

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Aug 26 17:01:07 2014
@@ -60,6 +60,12 @@ public class MockMessageInstance impleme
     }
 
     @Override
+    public ConsumerImpl getAcquiringConsumer()
+    {
+        return null;
+    }
+
+    @Override
     public boolean isAcquiredBy(final ConsumerImpl consumer)
     {
         return false;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Aug 26 17:01:07 2014
@@ -205,12 +205,13 @@ public class StandardQueueTest extends A
         {
             /**
              * Send a message and decrement latch
+             * @param consumer
              * @param entry
              * @param batch
              */
-            public long send(MessageInstance entry, boolean batch)
+            public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
             {
-                long size = super.send(entry, batch);
+                long size = super.send(consumer, entry, batch);
                 latch.countDown();
                 return size;
             }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Aug 26 17:01:07 2014
@@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends
     private final Map<String, Object> _arguments;
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
-    private ConsumerImpl _consumer;
+    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
 
     public ConsumerTarget_0_10(ServerSession session,
@@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends
         _name = name;
     }
 
-    public ConsumerImpl getConsumer()
-    {
-        return _consumer;
-    }
-
     public boolean isSuspended()
     {
         return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
@@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends
         boolean closed = false;
         State state = getState();
 
-        final ConsumerImpl consumer = getConsumer();
-        if(consumer != null)
-        {
-            consumer.getSendLock();
-        }
+        getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends
             }
         finally
         {
-            if(consumer != null)
-            {
-                consumer.releaseSendLock();
-            }
+            releaseSendLock();
         }
 
         return closed;
@@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends
             if(!updateState(State.SUSPENDED, State.ACTIVE))
             {
                 // this is a hack to get round the issue of increasing bytes credit
-                getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+                notifyCurrentState();
             }
         }
         else
@@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends
 
     private final AddMessageDispositionListenerAction _postIdSettingAction;
 
-    public long send(final MessageInstance entry, boolean batch)
+    public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends
 
         Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 
-        xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
-                    : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
+        xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
+                    : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
 
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
         {
-            xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+            xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
         }
         else if(_flowMode == MessageFlowMode.WINDOW)
         {
@@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends
         _postIdSettingAction.setXfr(xfr);
         if(_acceptMode == MessageAcceptMode.EXPLICIT)
         {
-            _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+            _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
         }
         else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
         {
-            _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+            _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
         }
         else
         {
@@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends
     {
         entry.setRedelivered();
         entry.routeToAlternate(null, null);
-        if(entry.isAcquiredBy(getConsumer()))
+        if(isAcquiredByConsumer(entry))
         {
             entry.delete();
         }
     }
 
+    private boolean isAcquiredByConsumer(final MessageInstance entry)
+    {
+        ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer();
+        if(acquiringConsumer instanceof QueueConsumer)
+        {
+            return ((QueueConsumer)acquiringConsumer).getTarget() == this;
+        }
+
+        return false;
+    }
+
     void release(final MessageInstance entry, final boolean setRedelivered)
     {
         if (setRedelivered)
@@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends
     {
         try
         {
-            getConsumer().getSendLock();
+            getSendLock();
 
             updateState(State.ACTIVE, State.SUSPENDED);
             _stopped.set(true);
@@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends
         }
         finally
         {
-            getConsumer().releaseSendLock();
+            releaseSendLock();
         }
     }
 
@@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends
 
     public boolean deleteAcquired(MessageInstance entry)
     {
-        if(entry.isAcquiredBy(getConsumer()))
+        if(isAcquiredByConsumer(entry))
         {
             acquisitionRemoved(entry);
             entry.delete();
@@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends
     public void flush()
     {
         flushCreditState(true);
-        getConsumer().flush();
+        for(ConsumerImpl consumer : _consumers)
+        {
+            consumer.flush();
+        }
         stop();
     }
 
@@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends
     @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
-        _consumer = sub;
+        _consumers.add(sub);
     }
 
     @Override
     public void consumerRemoved(final ConsumerImpl sub)
     {
+        _consumers.remove(sub);
+        if(_consumers.isEmpty())
+        {
+            close();
+        }
     }
 
     public long getUnacknowledgedBytes()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Tue Aug 26 17:01:07 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 
 
@@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeLis
 
     private final MessageInstance _entry;
     private final ConsumerTarget_0_10 _target;
+    private final ConsumerImpl _consumer;
 
-    public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+    public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+                                                   ConsumerTarget_0_10 target,
+                                                   final ConsumerImpl consumer)
     {
         _entry = entry;
         _target = target;
+        _consumer = consumer;
     }
 
     public void onAccept()
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
+        if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
         {
             _target.getSessionModel().acknowledge(_target, _entry);
         }
@@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+        if(_target != null && _entry.isAcquiredBy(_consumer))
         {
             _target.release(_entry, setRedelivered);
         }
@@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+        if(_target != null && _entry.isAcquiredBy(_consumer))
         {
             _target.reject(_entry);
         }
@@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        return _entry.acquire(_target.getConsumer());
+        return _entry.acquire(_consumer);
     }
 
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Tue Aug 26 17:01:07 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeLis
 
 
     private final MessageInstance _entry;
-    private ConsumerTarget_0_10 _target;
+    private final ConsumerTarget_0_10 _target;
+    private final ConsumerImpl _consumer;
 
-    public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+    public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
+                                                   ConsumerTarget_0_10 target,
+                                                   final ConsumerImpl consumer)
     {
         _entry = entry;
         _target = target;
+        _consumer = consumer;
     }
 
     public void onAccept()
@@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_target.getConsumer()))
+        if(_entry.isAcquiredBy(_consumer))
         {
             _target.release(_entry, setRedelivered);
         }
@@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_target.getConsumer()))
+        if(_entry.isAcquiredBy(_consumer))
         {
             _target.reject(_entry);
         }
@@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        boolean acquired = _entry.acquire(_target.getConsumer());
+        boolean acquired = _entry.acquire(_consumer);
         if(acquired)
         {
             _target.recordUnacknowledged(_entry);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Aug 26 17:01:07 2014
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.server.protocol.v0_10;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.transport.Method;
 
@@ -29,16 +30,22 @@ public class MessageAcceptCompletionList
     private final ConsumerTarget_0_10 _sub;
     private final MessageInstance _entry;
     private final ServerSession _session;
+    private final ConsumerImpl _consumer;
     private long _messageSize;
     private boolean _restoreCredit;
 
-    public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+    public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
+                                           final ConsumerImpl consumer,
+                                           ServerSession session,
+                                           MessageInstance entry,
+                                           boolean restoreCredit)
     {
         super();
         _sub = sub;
         _entry = entry;
         _session = session;
         _restoreCredit = restoreCredit;
+        _consumer = consumer;
         if(restoreCredit)
         {
             _messageSize = entry.getMessage().getSize();
@@ -51,7 +58,7 @@ public class MessageAcceptCompletionList
         {
             _sub.getCreditManager().restoreCredit(1l, _messageSize);
         }
-        if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
+        if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
         {
             _session.acknowledge(_sub, _entry);
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Aug 26 17:01:07 2014
@@ -25,6 +25,7 @@ import java.security.AccessControlExcept
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -198,15 +199,44 @@ public class ServerSessionDelegate exten
             else
             {
                 String queueName = method.getQueue();
-                VirtualHostImpl vhost = getVirtualHost(session);
+                VirtualHostImpl<?,?,?> vhost = getVirtualHost(session);
 
+                final Collection<MessageSource> sources = new HashSet<>();
                 final MessageSource queue = vhost.getMessageSource(queueName);
+                if(queue != null)
+                {
+                    sources.add(queue);
+                }
+                else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+                        && method.getArguments() != null
+                        && method.getArguments().get("x-multiqueue") instanceof Collection)
+                {
+                    for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
+                    {
+                        String sourceName = String.valueOf(object);
+                        sourceName = sourceName.trim();
+                        if(sourceName.length() != 0)
+                        {
+                            MessageSource source = vhost.getMessageSource(sourceName);
+                            if(source == null)
+                            {
+                                sources.clear();
+                                break;
+                            }
+                            else
+                            {
+                                sources.add(source);
+                            }
+                        }
+                    }
+                    queueName = method.getArguments().get("x-multiqueue").toString();
+                }
 
-                if(queue == null)
+                if(sources.isEmpty())
                 {
-                    exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(!queue.verifySessionAccess((ServerSession)session))
+                else if(!verifySessionAccess((ServerSession) session, sources))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -250,12 +280,15 @@ public class ServerSessionDelegate exten
                         {
                             options.add(ConsumerImpl.Option.EXCLUSIVE);
                         }
-                        ((ServerSession)session).register(
-                                queue.addConsumer(target,
-                                                  filterManager,
-                                                  MessageTransferMessage.class,
-                                                  destination,
-                                                  options));
+                        for(MessageSource source : sources)
+                        {
+                            ((ServerSession) session).register(
+                                    source.addConsumer(target,
+                                                      filterManager,
+                                                      MessageTransferMessage.class,
+                                                      destination,
+                                                      options));
+                        }
                     }
                     catch (AMQQueue.ExistingExclusiveConsumer existing)
                     {
@@ -278,6 +311,23 @@ public class ServerSessionDelegate exten
         }
     }
 
+    protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues)
+    {
+        for(MessageSource source : queues)
+        {
+            if(!verifySessionAccess(session, source))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue)
+    {
+        return queue.verifySessionAccess(session);
+    }
+
     @Override
     public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
@@ -820,17 +870,15 @@ public class ServerSessionDelegate exten
         return destination;
     }
 
-    private VirtualHostImpl getVirtualHost(Session session)
+    private VirtualHostImpl<?,?,?> getVirtualHost(Session session)
     {
         ServerConnection conn = getServerConnection(session);
-        VirtualHostImpl vhost = conn.getVirtualHost();
-        return vhost;
+        return conn.getVirtualHost();
     }
 
     private ServerConnection getServerConnection(Session session)
     {
-        ServerConnection conn = (ServerConnection) session.getConnection();
-        return conn;
+        return (ServerConnection) session.getConnection();
     }
 
     @Override
@@ -1238,7 +1286,7 @@ public class ServerSessionDelegate exten
                 exception(session, method, errorCode, description);
 
             }
-            else if (!queue.verifySessionAccess((ServerSession)session))
+            else if (!verifySessionAccess((ServerSession) session, queue))
             {
                 String description = "Cannot declare queue('" + queueName + "'),"
                                                                        + " as exclusive queue with same name "
@@ -1296,7 +1344,7 @@ public class ServerSessionDelegate exten
             catch(QueueExistsException qe)
             {
                 queue = qe.getExistingQueue();
-                if (!queue.verifySessionAccess((ServerSession)session))
+                if (!verifySessionAccess((ServerSession) session, queue))
                 {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1357,7 +1405,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                if(!queue.verifySessionAccess((ServerSession)session))
+                if(!verifySessionAccess((ServerSession) session, queue))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Aug 26 17:01:07 2014
@@ -61,6 +61,7 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQPro
     }
 
 
-    public ConsumerImpl getSubscription(AMQShortString tag)
+    public ConsumerTarget getSubscription(AMQShortString tag)
     {
-        final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
-        return target == null ? null : target.getConsumer();
+        return _tag2SubscriptionTargetMap.get(tag);
     }
 
     /**
@@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQPro
      *
      *
      * @param tag       the tag chosen by the client (if null, server will generate one)
-     * @param source     the queue to subscribe to
+     * @param sources     the queues to subscribe to
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
@@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQPro
      *
      * @throws org.apache.qpid.AMQException                  if something goes wrong
      */
-    public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+    public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
                                             FieldTable filters, boolean exclusive, boolean noLocal)
             throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
                    MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
@@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQPro
                     }
                 });
             }
-            ConsumerImpl sub =
-                    source.addConsumer(target,
-                                       filterManager,
-                                       AMQMessage.class,
-                                       AMQShortString.toString(tag),
-                                       options);
-            if(sub instanceof Consumer<?>)
-            {
-                final Consumer<?> modelConsumer = (Consumer<?>) sub;
-                consumerAdded(modelConsumer);
-                modelConsumer.addChangeListener(_consumerClosedListener);
-                _consumers.add(modelConsumer);
+            for(MessageSource source : sources)
+            {
+                ConsumerImpl sub =
+                        source.addConsumer(target,
+                                           filterManager,
+                                           AMQMessage.class,
+                                           AMQShortString.toString(tag),
+                                           options);
+                if (sub instanceof Consumer<?>)
+                {
+                    final Consumer<?> modelConsumer = (Consumer<?>) sub;
+                    consumerAdded(modelConsumer);
+                    modelConsumer.addChangeListener(_consumerClosedListener);
+                    _consumers.add(modelConsumer);
+                }
             }
         }
         catch (AccessControlException e)
@@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQPro
     {
 
         ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
-        ConsumerImpl sub = target == null ? null : target.getConsumer();
-        if (sub != null)
+        Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+        if (subs != null)
         {
-            sub.close();
-            if(sub instanceof Consumer<?>)
+            for(ConsumerImpl sub : subs)
             {
-                _consumers.remove(sub);
+                sub.close();
+                if (sub instanceof Consumer<?>)
+                {
+                    _consumers.remove(sub);
+                }
             }
             return true;
         }
@@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQPro
                 _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            ConsumerImpl sub = me.getValue().getConsumer();
+            Collection<ConsumerImpl> subs = me.getValue().getConsumers();
 
-            if(sub != null)
+            if(subs != null)
             {
-                sub.close();
+                for(ConsumerImpl sub : subs)
+                {
+                    sub.close();
+                }
             }
         }
 
@@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQPro
                 // may need to deliver queued messages
                 for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
-                    s.getConsumer().externalStateChange();
+                    for(ConsumerImpl sub : s.getConsumers())
+                    {
+                        sub.externalStateChange();
+                    }
                 }
             }
 
@@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQPro
                 {
                     try
                     {
-                        s.getConsumer().getSendLock();
+                        s.getSendLock();
                     }
                     finally
                     {
-                        s.getConsumer().releaseSendLock();
+                        s.releaseSendLock();
                     }
                 }
             }
@@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQPro
         // ensure all subscriptions have seen the change to the channel state
         for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
         {
-            sub.getConsumer().getSendLock();
-            sub.getConsumer().releaseSendLock();
+            sub.getSendLock();
+            sub.releaseSendLock();
         }
 
         try
@@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQPro
         if(requiresSuspend)
         {
             _suspended.set(false);
-            for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+            for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values())
             {
-                sub.getConsumer().externalStateChange();
+                for(ConsumerImpl sub : target.getConsumers())
+                {
+                    sub.externalStateChange();
+                }
             }
 
         }
@@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQPro
 
     public String toString()
     {
-        return "["+_session.toString()+":"+_channelId+"]";
+        return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
     }
 
     public void setDefaultQueue(AMQQueue queue)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8
 
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-    private ConsumerImpl _consumer;
+    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8
         return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
 
+    public List<ConsumerImpl> getConsumers()
+    {
+        return _consumers;
+    }
+
     static final class BrowserConsumer extends ConsumerTarget_0_8
     {
         public BrowserConsumer(AMQChannel channel,
@@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8
          * thread safe.
          *
          *
+         *
+         * @param consumer
          * @param entry
          * @param batch
          * @throws org.apache.qpid.AMQException
          */
         @Override
-        public long send(MessageInstance entry, boolean batch)
+        public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
@@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8
             synchronized (getChannel())
             {
                 long deliveryTag = getChannel().getNextDeliveryTag();
-                return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+                return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
             }
 
         }
@@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
          *
+         * @param consumer
          * @param entry   The message to send
          * @param batch
          */
         @Override
-        public long send(MessageInstance entry, boolean batch)
+        public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8
                 getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
-                size = sendToClient(message, props, deliveryTag);
+                size = sendToClient(consumer, message, props, deliveryTag);
 
             }
             ref.release();
@@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8
          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
          * thread safe.
          *
+         * @param consumer
          * @param entry   The message to send
          * @param batch
          */
         @Override
-        public long send(MessageInstance entry, boolean batch)
+        public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
         {
 
 
@@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
                 addUnacknowledgedMessage(entry);
-                recordMessageDelivery(entry, deliveryTag);
+                recordMessageDelivery(consumer, entry, deliveryTag);
                 entry.addStateChangeListener(getReleasedStateChangeListener());
-                long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+                long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
                 entry.incrementDeliveryCount();
                 return size;
             }
@@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8
         }
     }
 
-    public ConsumerImpl getConsumer()
-    {
-        return _consumer;
-    }
-
     @Override
     public void consumerRemoved(final ConsumerImpl sub)
     {
+        _consumers.remove(sub);
+        if(_consumers.isEmpty())
+        {
+            close();
+        }
     }
 
     @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
-        _consumer = sub;
+        _consumers.add( sub );
     }
 
     public AMQSessionModel getSessionModel()
@@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8
         boolean closed = false;
         State state = getState();
 
-        final ConsumerImpl consumer = getConsumer();
+        getSendLock();
 
-        if(consumer != null)
-        {
-            consumer.getSendLock();
-        }
         try
         {
             while(!closed && state != State.CLOSED)
@@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8
         }
         finally
         {
-            if(consumer != null)
-            {
-                consumer.releaseSendLock();
-            }
+            releaseSendLock();
         }
     }
 
@@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8
             if(!updateState(State.SUSPENDED, State.ACTIVE))
             {
                 // this is a hack to get round the issue of increasing bytes credit
-                getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+                notifyCurrentState();
             }
         }
         else
@@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8
         }
     }
 
-    protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+    protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+                                final InstanceProperties props,
+                                final long deliveryTag)
     {
-        return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+        return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag);
 
     }
 
 
-    protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
+    protected void recordMessageDelivery(final ConsumerImpl consumer,
+                                         final MessageInstance entry,
+                                         final long deliveryTag)
     {
-        _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
+        _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
     }
 
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.protocol.v0_8.handler;
 
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashSet;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
@@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQ
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-import java.security.AccessControlException;
-
 public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
 {
     private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
@@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler i
         AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
 
         AMQChannel channel = protocolConnection.getChannel(channelId);
-        VirtualHostImpl vHost = protocolConnection.getVirtualHost();
+        VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
 
         if (channel == null)
         {
@@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler i
         else
         {
             channel.sync();
+            String queueName = body.getQueue() == null ? null : body.getQueue().asString();
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("BasicConsume: from '" + body.getQueue() +
+                _logger.debug("BasicConsume: from '" + queueName +
                               "' for:" + body.getConsumerTag() +
                               " nowait:" + body.getNowait() +
                               " args:" + body.getArguments());
             }
 
-            MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+            MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+            final Collection<MessageSource> sources = new HashSet<>();
+            if(queue != null)
+            {
+                sources.add(queue);
+            }
+            else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+                    && body.getArguments() != null
+                    && body.getArguments().get("x-multiqueue") instanceof Collection)
+            {
+                for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue"))
+                {
+                    String sourceName = String.valueOf(object);
+                    sourceName = sourceName.trim();
+                    if(sourceName.length() != 0)
+                    {
+                        MessageSource source = vHost.getMessageSource(sourceName);
+                        if(source == null)
+                        {
+                            sources.clear();
+                            break;
+                        }
+                        else
+                        {
+                            sources.add(source);
+                        }
+                    }
+                }
+                queueName = body.getArguments().get("x-multiqueue").toString();
+            }
 
-            if (queue == null)
+            if (sources.isEmpty())
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("No queue for '" + body.getQueue() + "'");
+                    _logger.debug("No queue for '" + queueName + "'");
                 }
-                if (body.getQueue() != null)
+                if (queueName != null)
                 {
-                    String msg = "No such queue, '" + body.getQueue() + "'";
+                    String msg = "No such queue, '" + queueName + "'";
                     throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
                 }
                 else
@@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler i
                     {
 
                         AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
-                                                                               queue,
+                                                                               sources,
                                                                                !body.getNoAck(),
                                                                                body.getArguments(),
                                                                                body.getExclusive(),

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -138,7 +139,9 @@ public class AcknowledgeTest extends Qpi
         assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
 
         //Subscribe to the queue
-        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
+        AMQShortString subscriber = _channel.consumeFromSource(null,
+                                                               Collections.singleton(_queue),
+                                                               true, null, true, false);
 
         getQueue().deliverAsync();
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.qpid.common.AMQPFilterTypes;
@@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest e
         FieldTable filters = new FieldTable();
         filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
 
-        return channel.consumeFromSource(null, queue, true, filters, true, false);
+        return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false);
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Aug 26 17:01:07 2014
@@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends Abstrac
         boolean closed = false;
         State state = getState();
 
-        getConsumer().getSendLock();
+        getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends Abstrac
         }
         finally
         {
-            getConsumer().releaseSendLock();
+            releaseSendLock();
         }
     }
 
-    public long send(MessageInstance entry, boolean batch)
+    public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
         // TODO
         long size = entry.getMessage().getSize();
@@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     public void consumerRemoved(final ConsumerImpl sub)
     {
-
+        close();
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Tue Aug 26 17:01:07 2014
@@ -1065,6 +1065,12 @@ class ManagementNode implements MessageS
         }
 
         @Override
+        public ConsumerImpl getAcquiringConsumer()
+        {
+            return null;
+        }
+
+        @Override
         public boolean isAcquiredBy(final ConsumerImpl consumer)
         {
             return false;

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.management.amqp;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageSource;
@@ -27,19 +31,12 @@ import org.apache.qpid.server.message.in
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.StateChangeListener;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 class ManagementNodeConsumer implements ConsumerImpl
 {
     private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
     private final ManagementNode _managementNode;
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
     private final ConsumerTarget _target;
-    private final Lock _stateChangeLock = new ReentrantLock();
     private final String _name;
     private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
 
@@ -49,7 +46,7 @@ class ManagementNodeConsumer implements 
         _name = consumerName;
         _managementNode = managementNode;
         _target = target;
-        target.setStateListener(_targetChangeListener);
+        target.addStateListener(_targetChangeListener);
     }
 
     @Override
@@ -133,19 +130,19 @@ class ManagementNodeConsumer implements 
     @Override
     public boolean trySendLock()
     {
-        return _stateChangeLock.tryLock();
+        return _target.trySendLock();
     }
 
     @Override
     public void getSendLock()
     {
-        _stateChangeLock.lock();
+        _target.getSendLock();
     }
 
     @Override
     public void releaseSendLock()
     {
-        _stateChangeLock.unlock();
+        _target.releaseSendLock();
     }
 
 
@@ -174,13 +171,13 @@ class ManagementNodeConsumer implements 
 
     void send(final InternalMessage response)
     {
-        getSendLock();
+        _target.getSendLock();
         try
         {
             final ManagementResponse responseEntry = new ManagementResponse(this, response);
             if(_queue.isEmpty() && _target.allocateCredit(response))
             {
-                _target.send(responseEntry,false);
+                _target.send(this, responseEntry, false);
             }
             else
             {
@@ -189,7 +186,7 @@ class ManagementNodeConsumer implements 
         }
         finally
         {
-            releaseSendLock();
+            _target.releaseSendLock();
         }
     }
 
@@ -209,7 +206,7 @@ class ManagementNodeConsumer implements 
 
     private void deliverMessages()
     {
-        getSendLock();
+        _target.getSendLock();
         try
         {
             while(!_queue.isEmpty())
@@ -219,7 +216,7 @@ class ManagementNodeConsumer implements 
                 if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
                 {
                     _queue.remove(0);
-                    _target.send(managementResponse,false);
+                    _target.send(this, managementResponse, false);
                 }
                 else
                 {
@@ -229,7 +226,7 @@ class ManagementNodeConsumer implements 
         }
         finally
         {
-            releaseSendLock();
+            _target.releaseSendLock();
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Tue Aug 26 17:01:07 2014
@@ -84,6 +84,12 @@ class ManagementResponse implements Mess
     }
 
     @Override
+    public ConsumerImpl getAcquiringConsumer()
+    {
+        return _consumer;
+    }
+
+    @Override
     public boolean isAcquiredBy(final ConsumerImpl consumer)
     {
         return consumer == _consumer && !isDeleted();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Aug 26 17:01:07 2014
@@ -683,47 +683,48 @@ public abstract class AMQSession<C exten
 
 
             int type = resolveAddressType(dest);
-
+            boolean resolved = false;
             switch (type)
             {
                 case AMQDestination.QUEUE_TYPE:
-                {
+
+                    setLegacyFieldsForQueueType(dest);
                     if(createNode)
                     {
-                        setLegacyFieldsForQueueType(dest);
                         handleQueueNodeCreation(dest,noLocal);
-                        break;
+                        resolved = true;
                     }
                     else if (isQueueExist(dest,assertNode))
                     {
-                        setLegacyFieldsForQueueType(dest);
-                        break;
+                        resolved = true;
                     }
-                }
+                    break;
 
                 case AMQDestination.TOPIC_TYPE:
-                {
+
+                    setLegacyFieldsForTopicType(dest);
                     if(createNode)
                     {
-                        setLegacyFieldsForTopicType(dest);
                         verifySubject(dest);
                         handleExchangeNodeCreation(dest);
-                        break;
+                        resolved = true;
                     }
                     else if (isExchangeExist(dest,assertNode))
                     {
-                        setLegacyFieldsForTopicType(dest);
                         verifySubject(dest);
-                        break;
+                        resolved = true;
                     }
-                }
+                    break;
 
                 default:
                     throw new AMQException(
                             "The name '" + dest.getAddressName() +
                             "' supplied in the address doesn't resolve to an exchange or a queue");
             }
-            dest.setAddressResolved(System.currentTimeMillis());
+            if(resolved)
+            {
+                dest.setAddressResolved(System.currentTimeMillis());
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Aug 26 17:01:07 2014
@@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQ
         Link link = destination.getLink();
         if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
         {
-            arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
+            arguments.putAll(link.getSubscription().getArgs());
         }
 
         boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
 
+        String queue = queueName == null ? destination.getAddressName() : queueName.toString();
         getQpidSession().messageSubscribe
-            (queueName.toString(), String.valueOf(tag),
+            (queue, String.valueOf(tag),
              acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
              preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
 
-        String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+        String consumerTag = (consumer).getConsumerTagString();
 
         if (capacity == 0)
         {
@@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQ
         }
         catch(SessionException e)
         {
-            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+            if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED
+                    || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
             {
                 match = false;
             }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Aug 26 17:01:07 2014
@@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQS
     {
         queueName = preprocessAddressTopic(consumer, queueName);
 
+        AMQDestination destination = consumer.getDestination();
+
+        Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+        Link link = destination.getLink();
+        if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
+        {
+            arguments.putAll(link.getSubscription().getArgs());
+        }
+
         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
                                                                            queueName,
                                                                            new AMQShortString(String.valueOf(tag)),
@@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQS
                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
                                                                            consumer.isExclusive(),
                                                                            nowait,
-                                                                           consumer.getArguments());
+                                                                           FieldTable.convertToFieldTable(arguments));
 
 
         AMQFrame jmsConsume = body.generateFrame(getChannelId());

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Tue Aug 26 17:01:07 2014
@@ -43,7 +43,8 @@ public class AddressHelper
     public static final String LINK = "link";
     public static final String X_DECLARE = "x-declare";
     public static final String X_BINDINGS = "x-bindings";
-    public static final String X_SUBSCRIBE = "x-subscribes";
+    public static final String X_SUBSCRIBES = "x-subscribes";
+    public static final String X_SUBSCRIBE = "x-subscribe";
     public static final String CREATE = "create";
     public static final String ASSERT = "assert";
     public static final String DELETE = "delete";
@@ -265,19 +266,32 @@ public class AddressHelper
             
             Map linkMap = (Map) _address.getOptions().get(LINK);
 
-            if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
-            {   
-                Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
-                
-                if (x_subscribe.containsKey(ARGUMENTS))
+            if (linkMap != null)
+            {
+                Map x_subscribe = null;
+
+                if(linkMap.containsKey(X_SUBSCRIBE))
                 {
-                    link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+                    x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+                }
+                else if(linkMap.containsKey(X_SUBSCRIBES))
+                {
+                    // left in for backwards compatibility with old broken constant
+                    x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES);
+                }
+
+                if(x_subscribe != null)
+                {
+                    if (x_subscribe.containsKey(ARGUMENTS))
+                    {
+                        link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS));
+                    }
+
+                    boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+                            Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false;
+
+                    link.getSubscription().setExclusive(exclusive);
                 }
-                
-                boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
-                                    Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
-                
-                link.getSubscription().setExclusive(exclusive);
             }
 
             link.setBindings(getBindings(linkMap));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org