You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/26 19:01:08 UTC
svn commit: r1620659 [1/2] - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/test/jav...
Author: rgodfrey
Date: Tue Aug 26 17:01:07 2014
New Revision: 1620659
URL: http://svn.apache.org/r1620659
Log:
QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Aug 26 17:01:07 2014
@@ -20,16 +20,24 @@
*/
package org.apache.qpid.server.consumer;
-import org.apache.qpid.server.util.StateChangeListener;
-
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private final AtomicReference<State> _state;
- private final AtomicReference<StateChangeListener<ConsumerTarget, State>> _stateListener =
- new AtomicReference<StateChangeListener<ConsumerTarget, State>>();
+
+ private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
+ CopyOnWriteArraySet<>();
+
+ private final Lock _stateChangeLock = new ReentrantLock();
+
protected AbstractConsumerTarget(final State initialState)
{
@@ -46,8 +54,7 @@ public abstract class AbstractConsumerTa
{
if(_state.compareAndSet(from, to))
{
- StateChangeListener<ConsumerTarget, State> listener = _stateListener.get();
- if(listener != null)
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
{
listener.stateChanged(this, from, to);
}
@@ -59,15 +66,38 @@ public abstract class AbstractConsumerTa
}
}
+ public final void notifyCurrentState()
+ {
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ State state = getState();
+ listener.stateChanged(this, state, state);
+ }
+ }
+ public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.add(listener);
+ }
+
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ _stateChangeListeners.remove(listener);
+ }
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
- public final void setStateListener(StateChangeListener<ConsumerTarget, State> listener)
+ public final void getSendLock()
{
- _stateListener.set(listener);
+ _stateChangeLock.lock();
}
- public final StateChangeListener<ConsumerTarget, State> getStateListener()
+ public final void releaseSendLock()
{
- return _stateListener.get();
+ _stateChangeLock.unlock();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Tue Aug 26 17:01:07 2014
@@ -31,6 +31,8 @@ public interface ConsumerTarget
void acquisitionRemoved(MessageInstance node);
+ void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -42,7 +44,7 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
- void setStateListener(StateChangeListener<ConsumerTarget, State> listener);
+ void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
@@ -50,7 +52,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- long send(MessageInstance entry, boolean batch);
+ long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
void flushBatched();
@@ -65,4 +67,11 @@ public interface ConsumerTarget
boolean isSuspended();
boolean close();
+
+ boolean trySendLock();
+
+ void getSendLock();
+
+ void releaseSendLock();
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Aug 26 17:01:07 2014
@@ -79,6 +79,8 @@ public interface MessageInstance
Filterable asFilterable();
+ ConsumerImpl getAcquiringConsumer();
+
public static enum State
{
AVAILABLE,
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Tue Aug 26 17:01:07 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Consumer;
@@ -52,4 +53,6 @@ public interface QueueConsumer<X extends
MessageInstance.ConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
+
+ ConsumerTarget getTarget();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Tue Aug 26 17:01:07 2014
@@ -31,8 +31,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -65,7 +63,6 @@ class QueueConsumerImpl
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
- private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
private final boolean _acquires;
@@ -90,6 +87,8 @@ class QueueConsumerImpl
private final ConsumerTarget _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
+ private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+ _listener;
private volatile QueueContext _queueContext;
private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
@@ -134,17 +133,17 @@ class QueueConsumerImpl
setupLogging();
- _target.setStateListener(
- new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- });
+ _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ targetStateChanged(oldState, newState);
+ }
+ };
+ _target.addStateListener(_listener);
}
private static Map<String, Object> createAttributeMap(String name, FilterManager filters, EnumSet<Option> optionSet)
@@ -202,6 +201,7 @@ class QueueConsumerImpl
}
}
+ @Override
public ConsumerTarget getTarget()
{
return _target;
@@ -248,17 +248,17 @@ class QueueConsumerImpl
{
if(_closed.compareAndSet(false,true))
{
- getSendLock();
+ _target.getSendLock();
try
{
- _target.close();
_target.consumerRemoved(this);
+ _target.removeStateChangeListener(_listener);
_queue.unregisterConsumer(this);
deleted();
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -420,17 +420,17 @@ class QueueConsumerImpl
public final boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return getTarget().trySendLock();
}
public final void getSendLock()
{
- _stateChangeLock.lock();
+ getTarget().getSendLock();
}
public final void releaseSendLock()
{
- _stateChangeLock.unlock();
+ getTarget().releaseSendLock();
}
public final long getCreateTime()
@@ -471,7 +471,7 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- long size = _target.send(entry, batch);
+ long size = _target.send(this, entry, batch);
_deliveredBytes.addAndGet(size);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Aug 26 17:01:07 2014
@@ -247,6 +247,26 @@ public abstract class QueueEntryImpl imp
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ ConsumerImpl consumer;
+ EntryState state = _state;
+ if(state instanceof ConsumerAcquiredState)
+ {
+ consumer = ((ConsumerAcquiredState)state).getConsumer();
+ }
+ else if(state instanceof LockedAcquiredState)
+ {
+ consumer = ((LockedAcquiredState)state).getConsumer();
+ }
+ else
+ {
+ consumer = null;
+ }
+ return consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Tue Aug 26 17:01:07 2014
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-
/**
* QueueRunners are Runnables used to process a queue when requiring
* asynchronous message delivery to consumers, which is necessary
@@ -111,7 +111,7 @@ public class QueueRunner implements Runn
public String toString()
{
- return "QueueRunner-" + _queue.getLogSubject();
+ return "QueueRunner-" + _queue.getLogSubject().toLogString();
}
public void execute()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Tue Aug 26 17:01:07 2014
@@ -21,18 +21,18 @@
package org.apache.qpid.server.queue;
+import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-import java.security.PrivilegedAction;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
class SubFlushRunner implements Runnable
{
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Tue Aug 26 17:01:07 2014
@@ -167,7 +167,7 @@ public class MockConsumer implements Con
{
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
long size = entry.getMessage().getSize();
if (messages.contains(entry))
@@ -202,7 +202,7 @@ public class MockConsumer implements Con
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
public void setState(State state)
@@ -216,11 +216,20 @@ public class MockConsumer implements Con
}
@Override
- public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
+ public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
{
_listener = listener;
}
+ @Override
+ public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
+ {
+ if(_listener == listener)
+ {
+ _listener = null;
+ }
+ }
+
public ArrayList<MessageInstance> getMessages()
{
return messages;
@@ -242,6 +251,23 @@ public class MockConsumer implements Con
_isActive = isActive;
}
+
+ public final boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ public final void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ public final void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+
private static class MockSessionModel implements AMQSessionModel
{
private final UUID _id = UUID.randomUUID();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Aug 26 17:01:07 2014
@@ -60,6 +60,12 @@ public class MockMessageInstance impleme
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Tue Aug 26 17:01:07 2014
@@ -205,12 +205,13 @@ public class StandardQueueTest extends A
{
/**
* Send a message and decrement latch
+ * @param consumer
* @param entry
* @param batch
*/
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- long size = super.send(entry, batch);
+ long size = super.send(consumer, entry, batch);
latch.countDown();
return size;
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Aug 26 17:01:07 2014
@@ -22,7 +22,9 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,6 +39,7 @@ import org.apache.qpid.server.model.Exch
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -77,7 +80,7 @@ public class ConsumerTarget_0_10 extends
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public ConsumerTarget_0_10(ServerSession session,
@@ -101,11 +104,6 @@ public class ConsumerTarget_0_10 extends
_name = name;
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
public boolean isSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
@@ -116,11 +114,7 @@ public class ConsumerTarget_0_10 extends
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -135,10 +129,7 @@ public class ConsumerTarget_0_10 extends
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
return closed;
@@ -153,7 +144,7 @@ public class ConsumerTarget_0_10 extends
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -200,7 +191,7 @@ public class ConsumerTarget_0_10 extends
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -303,12 +294,12 @@ public class ConsumerTarget_0_10 extends
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body);
+ xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
+ : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- xfr.setCompletionListener(new MessageAcceptCompletionListener(this, _session, entry, _flowMode == MessageFlowMode.WINDOW));
+ xfr.setCompletionListener(new MessageAcceptCompletionListener(this, consumer, _session, entry, _flowMode == MessageFlowMode.WINDOW));
}
else if(_flowMode == MessageFlowMode.WINDOW)
{
@@ -325,11 +316,11 @@ public class ConsumerTarget_0_10 extends
_postIdSettingAction.setXfr(xfr);
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
- _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else if(_acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
- _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this));
+ _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
}
else
{
@@ -401,12 +392,23 @@ public class ConsumerTarget_0_10 extends
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
entry.delete();
}
}
+ private boolean isAcquiredByConsumer(final MessageInstance entry)
+ {
+ ConsumerImpl acquiringConsumer = entry.getAcquiringConsumer();
+ if(acquiringConsumer instanceof QueueConsumer)
+ {
+ return ((QueueConsumer)acquiringConsumer).getTarget() == this;
+ }
+
+ return false;
+ }
+
void release(final MessageInstance entry, final boolean setRedelivered)
{
if (setRedelivered)
@@ -503,7 +505,7 @@ public class ConsumerTarget_0_10 extends
{
try
{
- getConsumer().getSendLock();
+ getSendLock();
updateState(State.ACTIVE, State.SUSPENDED);
_stopped.set(true);
@@ -512,7 +514,7 @@ public class ConsumerTarget_0_10 extends
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
@@ -572,7 +574,7 @@ public class ConsumerTarget_0_10 extends
public boolean deleteAcquired(MessageInstance entry)
{
- if(entry.isAcquiredBy(getConsumer()))
+ if(isAcquiredByConsumer(entry))
{
acquisitionRemoved(entry);
entry.delete();
@@ -594,7 +596,10 @@ public class ConsumerTarget_0_10 extends
public void flush()
{
flushCreditState(true);
- getConsumer().flush();
+ for(ConsumerImpl consumer : _consumers)
+ {
+ consumer.flush();
+ }
stop();
}
@@ -626,12 +631,17 @@ public class ConsumerTarget_0_10 extends
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add(sub);
}
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
public long getUnacknowledgedBytes()
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Tue Aug 26 17:01:07 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -32,16 +33,20 @@ class ExplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
+ if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
@@ -54,7 +59,7 @@ class ExplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -66,7 +71,7 @@ class ExplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -79,7 +84,7 @@ class ExplicitAcceptDispositionChangeLis
public boolean acquire()
{
- return _entry.acquire(_target.getConsumer());
+ return _entry.acquire(_consumer);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Tue Aug 26 17:01:07 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,12 +31,16 @@ class ImplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
- private ConsumerTarget_0_10 _target;
+ private final ConsumerTarget_0_10 _target;
+ private final ConsumerImpl _consumer;
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final ConsumerImpl consumer)
{
_entry = entry;
_target = target;
+ _consumer = consumer;
}
public void onAccept()
@@ -45,7 +50,7 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.release(_entry, setRedelivered);
}
@@ -57,7 +62,7 @@ class ImplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_entry.isAcquiredBy(_target.getConsumer()))
+ if(_entry.isAcquiredBy(_consumer))
{
_target.reject(_entry);
}
@@ -70,7 +75,7 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
- boolean acquired = _entry.acquire(_target.getConsumer());
+ boolean acquired = _entry.acquire(_consumer);
if(acquired)
{
_target.recordUnacknowledged(_entry);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Aug 26 17:01:07 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
@@ -29,16 +30,22 @@ public class MessageAcceptCompletionList
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private final ConsumerImpl _consumer;
private long _messageSize;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
+ final ConsumerImpl consumer,
+ ServerSession session,
+ MessageInstance entry,
+ boolean restoreCredit)
{
super();
_sub = sub;
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ _consumer = consumer;
if(restoreCredit)
{
_messageSize = entry.getMessage().getSize();
@@ -51,7 +58,7 @@ public class MessageAcceptCompletionList
{
_sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
+ if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Tue Aug 26 17:01:07 2014
@@ -25,6 +25,7 @@ import java.security.AccessControlExcept
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -198,15 +199,44 @@ public class ServerSessionDelegate exten
else
{
String queueName = method.getQueue();
- VirtualHostImpl vhost = getVirtualHost(session);
+ VirtualHostImpl<?,?,?> vhost = getVirtualHost(session);
+ final Collection<MessageSource> sources = new HashSet<>();
final MessageSource queue = vhost.getMessageSource(queueName);
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vhost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && method.getArguments() != null
+ && method.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vhost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = method.getArguments().get("x-multiqueue").toString();
+ }
- if(queue == null)
+ if(sources.isEmpty())
{
- exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(!queue.verifySessionAccess((ServerSession)session))
+ else if(!verifySessionAccess((ServerSession) session, sources))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -250,12 +280,15 @@ public class ServerSessionDelegate exten
{
options.add(ConsumerImpl.Option.EXCLUSIVE);
}
- ((ServerSession)session).register(
- queue.addConsumer(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options));
+ for(MessageSource source : sources)
+ {
+ ((ServerSession) session).register(
+ source.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options));
+ }
}
catch (AMQQueue.ExistingExclusiveConsumer existing)
{
@@ -278,6 +311,23 @@ public class ServerSessionDelegate exten
}
}
+ protected boolean verifySessionAccess(final ServerSession session, final Collection<MessageSource> queues)
+ {
+ for(MessageSource source : queues)
+ {
+ if(!verifySessionAccess(session, source))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected boolean verifySessionAccess(final ServerSession session, final MessageSource queue)
+ {
+ return queue.verifySessionAccess(session);
+ }
+
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
@@ -820,17 +870,15 @@ public class ServerSessionDelegate exten
return destination;
}
- private VirtualHostImpl getVirtualHost(Session session)
+ private VirtualHostImpl<?,?,?> getVirtualHost(Session session)
{
ServerConnection conn = getServerConnection(session);
- VirtualHostImpl vhost = conn.getVirtualHost();
- return vhost;
+ return conn.getVirtualHost();
}
private ServerConnection getServerConnection(Session session)
{
- ServerConnection conn = (ServerConnection) session.getConnection();
- return conn;
+ return (ServerConnection) session.getConnection();
}
@Override
@@ -1238,7 +1286,7 @@ public class ServerSessionDelegate exten
exception(session, method, errorCode, description);
}
- else if (!queue.verifySessionAccess((ServerSession)session))
+ else if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1296,7 +1344,7 @@ public class ServerSessionDelegate exten
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (!queue.verifySessionAccess((ServerSession)session))
+ if (!verifySessionAccess((ServerSession) session, queue))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1357,7 +1405,7 @@ public class ServerSessionDelegate exten
}
else
{
- if(!queue.verifySessionAccess((ServerSession)session))
+ if(!verifySessionAccess((ServerSession) session, queue))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Aug 26 17:01:07 2014
@@ -61,6 +61,7 @@ import org.apache.qpid.server.Transactio
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -543,10 +544,9 @@ public class AMQChannel<T extends AMQPro
}
- public ConsumerImpl getSubscription(AMQShortString tag)
+ public ConsumerTarget getSubscription(AMQShortString tag)
{
- final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
- return target == null ? null : target.getConsumer();
+ return _tag2SubscriptionTargetMap.get(tag);
}
/**
@@ -555,7 +555,7 @@ public class AMQChannel<T extends AMQPro
*
*
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param source the queue to subscribe to
+ * @param sources the queues to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
@@ -564,7 +564,7 @@ public class AMQChannel<T extends AMQPro
*
* @throws org.apache.qpid.AMQException if something goes wrong
*/
- public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
@@ -632,18 +632,21 @@ public class AMQChannel<T extends AMQPro
}
});
}
- ConsumerImpl sub =
- source.addConsumer(target,
- filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options);
- if(sub instanceof Consumer<?>)
- {
- final Consumer<?> modelConsumer = (Consumer<?>) sub;
- consumerAdded(modelConsumer);
- modelConsumer.addChangeListener(_consumerClosedListener);
- _consumers.add(modelConsumer);
+ for(MessageSource source : sources)
+ {
+ ConsumerImpl sub =
+ source.addConsumer(target,
+ filterManager,
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
+ if (sub instanceof Consumer<?>)
+ {
+ final Consumer<?> modelConsumer = (Consumer<?>) sub;
+ consumerAdded(modelConsumer);
+ modelConsumer.addChangeListener(_consumerClosedListener);
+ _consumers.add(modelConsumer);
+ }
}
}
catch (AccessControlException e)
@@ -683,13 +686,16 @@ public class AMQChannel<T extends AMQPro
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- ConsumerImpl sub = target == null ? null : target.getConsumer();
- if (sub != null)
+ Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+ if (subs != null)
{
- sub.close();
- if(sub instanceof Consumer<?>)
+ for(ConsumerImpl sub : subs)
{
- _consumers.remove(sub);
+ sub.close();
+ if (sub instanceof Consumer<?>)
+ {
+ _consumers.remove(sub);
+ }
}
return true;
}
@@ -763,11 +769,14 @@ public class AMQChannel<T extends AMQPro
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- ConsumerImpl sub = me.getValue().getConsumer();
+ Collection<ConsumerImpl> subs = me.getValue().getConsumers();
- if(sub != null)
+ if(subs != null)
{
- sub.close();
+ for(ConsumerImpl sub : subs)
+ {
+ sub.close();
+ }
}
}
@@ -1032,7 +1041,10 @@ public class AMQChannel<T extends AMQPro
// may need to deliver queued messages
for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : s.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1050,11 +1062,11 @@ public class AMQChannel<T extends AMQPro
{
try
{
- s.getConsumer().getSendLock();
+ s.getSendLock();
}
finally
{
- s.getConsumer().releaseSendLock();
+ s.releaseSendLock();
}
}
}
@@ -1133,8 +1145,8 @@ public class AMQChannel<T extends AMQPro
// ensure all subscriptions have seen the change to the channel state
for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().getSendLock();
- sub.getConsumer().releaseSendLock();
+ sub.getSendLock();
+ sub.releaseSendLock();
}
try
@@ -1169,9 +1181,12 @@ public class AMQChannel<T extends AMQPro
if(requiresSuspend)
{
_suspended.set(false);
- for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 target : _tag2SubscriptionTargetMap.values())
{
- sub.getConsumer().externalStateChange();
+ for(ConsumerImpl sub : target.getConsumers())
+ {
+ sub.externalStateChange();
+ }
}
}
@@ -1179,7 +1194,7 @@ public class AMQChannel<T extends AMQPro
public String toString()
{
- return "["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,7 +75,7 @@ public abstract class ConsumerTarget_0_8
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private ConsumerImpl _consumer;
+ private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -93,6 +95,11 @@ public abstract class ConsumerTarget_0_8
return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
+ public List<ConsumerImpl> getConsumers()
+ {
+ return _consumers;
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -111,12 +118,14 @@ public abstract class ConsumerTarget_0_8
* thread safe.
*
*
+ *
+ * @param consumer
* @param entry
* @param batch
* @throws org.apache.qpid.AMQException
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -124,7 +133,7 @@ public abstract class ConsumerTarget_0_8
synchronized (getChannel())
{
long deliveryTag = getChannel().getNextDeliveryTag();
- return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
}
}
@@ -173,11 +182,12 @@ public abstract class ConsumerTarget_0_8
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -200,7 +210,7 @@ public abstract class ConsumerTarget_0_8
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
- size = sendToClient(message, props, deliveryTag);
+ size = sendToClient(consumer, message, props, deliveryTag);
}
ref.release();
@@ -287,11 +297,12 @@ public abstract class ConsumerTarget_0_8
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
+ * @param consumer
* @param entry The message to send
* @param batch
*/
@Override
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -301,9 +312,9 @@ public abstract class ConsumerTarget_0_8
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
- recordMessageDelivery(entry, deliveryTag);
+ recordMessageDelivery(consumer, entry, deliveryTag);
entry.addStateChangeListener(getReleasedStateChangeListener());
- long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
return size;
}
@@ -366,20 +377,20 @@ public abstract class ConsumerTarget_0_8
}
}
- public ConsumerImpl getConsumer()
- {
- return _consumer;
- }
-
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
+ _consumers.remove(sub);
+ if(_consumers.isEmpty())
+ {
+ close();
+ }
}
@Override
public void consumerAdded(final ConsumerImpl sub)
{
- _consumer = sub;
+ _consumers.add( sub );
}
public AMQSessionModel getSessionModel()
@@ -426,12 +437,8 @@ public abstract class ConsumerTarget_0_8
boolean closed = false;
State state = getState();
- final ConsumerImpl consumer = getConsumer();
+ getSendLock();
- if(consumer != null)
- {
- consumer.getSendLock();
- }
try
{
while(!closed && state != State.CLOSED)
@@ -447,10 +454,7 @@ public abstract class ConsumerTarget_0_8
}
finally
{
- if(consumer != null)
- {
- consumer.releaseSendLock();
- }
+ releaseSendLock();
}
}
@@ -493,7 +497,7 @@ public abstract class ConsumerTarget_0_8
if(!updateState(State.SUSPENDED, State.ACTIVE))
{
// this is a hack to get round the issue of increasing bytes credit
- getStateListener().stateChanged(this, State.ACTIVE, State.ACTIVE);
+ notifyCurrentState();
}
}
else
@@ -502,16 +506,20 @@ public abstract class ConsumerTarget_0_8
}
}
- protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
+ protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+ final InstanceProperties props,
+ final long deliveryTag)
{
- return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
+ return _deliveryMethod.deliverToClient(consumer, message, props, deliveryTag);
}
- protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
+ protected void recordMessageDelivery(final ConsumerImpl consumer,
+ final MessageInstance entry,
+ final long deliveryTag)
{
- _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
+ _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.HashSet;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,13 +36,11 @@ import org.apache.qpid.server.filter.AMQ
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
{
private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class);
@@ -59,7 +61,7 @@ public class BasicConsumeMethodHandler i
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHostImpl vHost = protocolConnection.getVirtualHost();
+ VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
if (channel == null)
{
@@ -68,25 +70,55 @@ public class BasicConsumeMethodHandler i
else
{
channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
if (_logger.isDebugEnabled())
{
- _logger.debug("BasicConsume: from '" + body.getQueue() +
+ _logger.debug("BasicConsume: from '" + queueName +
"' for:" + body.getConsumerTag() +
" nowait:" + body.getNowait() +
" args:" + body.getArguments());
}
- MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>)body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
- if (queue == null)
+ if (sources.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("No queue for '" + body.getQueue() + "'");
+ _logger.debug("No queue for '" + queueName + "'");
}
- if (body.getQueue() != null)
+ if (queueName != null)
{
- String msg = "No such queue, '" + body.getQueue() + "'";
+ String msg = "No such queue, '" + queueName + "'";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
@@ -114,7 +146,7 @@ public class BasicConsumeMethodHandler i
{
AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
- queue,
+ sources,
!body.getNoAck(),
body.getArguments(),
body.getExclusive(),
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -138,7 +139,9 @@ public class AcknowledgeTest extends Qpi
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
+ AMQShortString subscriber = _channel.consumeFromSource(null,
+ Collections.singleton(_queue),
+ true, null, true, false);
getQueue().deliverAsync();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Collections;
import java.util.List;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -143,6 +144,6 @@ public class QueueBrowserUsesNoAckTest e
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true, false);
+ return channel.consumeFromSource(null, Collections.singleton(queue), true, filters, true, false);
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Aug 26 17:01:07 2014
@@ -93,7 +93,7 @@ class ConsumerTarget_1_0 extends Abstrac
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -108,11 +108,11 @@ class ConsumerTarget_1_0 extends Abstrac
}
finally
{
- getConsumer().releaseSendLock();
+ releaseSendLock();
}
}
- public long send(MessageInstance entry, boolean batch)
+ public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
// TODO
long size = entry.getMessage().getSize();
@@ -515,7 +515,7 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public void consumerRemoved(final ConsumerImpl sub)
{
-
+ close();
}
@Override
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Tue Aug 26 17:01:07 2014
@@ -1065,6 +1065,12 @@ class ManagementNode implements MessageS
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Tue Aug 26 17:01:07 2014
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageSource;
@@ -27,19 +31,12 @@ import org.apache.qpid.server.message.in
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
class ManagementNodeConsumer implements ConsumerImpl
{
private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
- private final Lock _stateChangeLock = new ReentrantLock();
private final String _name;
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
@@ -49,7 +46,7 @@ class ManagementNodeConsumer implements
_name = consumerName;
_managementNode = managementNode;
_target = target;
- target.setStateListener(_targetChangeListener);
+ target.addStateListener(_targetChangeListener);
}
@Override
@@ -133,19 +130,19 @@ class ManagementNodeConsumer implements
@Override
public boolean trySendLock()
{
- return _stateChangeLock.tryLock();
+ return _target.trySendLock();
}
@Override
public void getSendLock()
{
- _stateChangeLock.lock();
+ _target.getSendLock();
}
@Override
public void releaseSendLock()
{
- _stateChangeLock.unlock();
+ _target.releaseSendLock();
}
@@ -174,13 +171,13 @@ class ManagementNodeConsumer implements
void send(final InternalMessage response)
{
- getSendLock();
+ _target.getSendLock();
try
{
final ManagementResponse responseEntry = new ManagementResponse(this, response);
if(_queue.isEmpty() && _target.allocateCredit(response))
{
- _target.send(responseEntry,false);
+ _target.send(this, responseEntry, false);
}
else
{
@@ -189,7 +186,7 @@ class ManagementNodeConsumer implements
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
@@ -209,7 +206,7 @@ class ManagementNodeConsumer implements
private void deliverMessages()
{
- getSendLock();
+ _target.getSendLock();
try
{
while(!_queue.isEmpty())
@@ -219,7 +216,7 @@ class ManagementNodeConsumer implements
if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- _target.send(managementResponse,false);
+ _target.send(this, managementResponse, false);
}
else
{
@@ -229,7 +226,7 @@ class ManagementNodeConsumer implements
}
finally
{
- releaseSendLock();
+ _target.releaseSendLock();
}
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Tue Aug 26 17:01:07 2014
@@ -84,6 +84,12 @@ class ManagementResponse implements Mess
}
@Override
+ public ConsumerImpl getAcquiringConsumer()
+ {
+ return _consumer;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Aug 26 17:01:07 2014
@@ -683,47 +683,48 @@ public abstract class AMQSession<C exten
int type = resolveAddressType(dest);
-
+ boolean resolved = false;
switch (type)
{
case AMQDestination.QUEUE_TYPE:
- {
+
+ setLegacyFieldsForQueueType(dest);
if(createNode)
{
- setLegacyFieldsForQueueType(dest);
handleQueueNodeCreation(dest,noLocal);
- break;
+ resolved = true;
}
else if (isQueueExist(dest,assertNode))
{
- setLegacyFieldsForQueueType(dest);
- break;
+ resolved = true;
}
- }
+ break;
case AMQDestination.TOPIC_TYPE:
- {
+
+ setLegacyFieldsForTopicType(dest);
if(createNode)
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
handleExchangeNodeCreation(dest);
- break;
+ resolved = true;
}
else if (isExchangeExist(dest,assertNode))
{
- setLegacyFieldsForTopicType(dest);
verifySubject(dest);
- break;
+ resolved = true;
}
- }
+ break;
default:
throw new AMQException(
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(System.currentTimeMillis());
+ if(resolved)
+ {
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Aug 26 17:01:07 2014
@@ -645,18 +645,19 @@ public class AMQSession_0_10 extends AMQ
Link link = destination.getLink();
if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
+ arguments.putAll(link.getSubscription().getArgs());
}
boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+ String queue = queueName == null ? destination.getAddressName() : queueName.toString();
getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
+ (queue, String.valueOf(tag),
acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+ String consumerTag = (consumer).getConsumerTagString();
if (capacity == 0)
{
@@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQ
}
catch(SessionException e)
{
- if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED
+ || e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
{
match = false;
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Aug 26 17:01:07 2014
@@ -514,6 +514,16 @@ public class AMQSession_0_8 extends AMQS
{
queueName = preprocessAddressTopic(consumer, queueName);
+ AMQDestination destination = consumer.getDestination();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
+ {
+ arguments.putAll(link.getSubscription().getArgs());
+ }
+
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
new AMQShortString(String.valueOf(tag)),
@@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQS
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- consumer.getArguments());
+ FieldTable.convertToFieldTable(arguments));
AMQFrame jmsConsume = body.generateFrame(getChannelId());
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1620659&r1=1620658&r2=1620659&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Tue Aug 26 17:01:07 2014
@@ -43,7 +43,8 @@ public class AddressHelper
public static final String LINK = "link";
public static final String X_DECLARE = "x-declare";
public static final String X_BINDINGS = "x-bindings";
- public static final String X_SUBSCRIBE = "x-subscribes";
+ public static final String X_SUBSCRIBES = "x-subscribes";
+ public static final String X_SUBSCRIBE = "x-subscribe";
public static final String CREATE = "create";
public static final String ASSERT = "assert";
public static final String DELETE = "delete";
@@ -265,19 +266,32 @@ public class AddressHelper
Map linkMap = (Map) _address.getOptions().get(LINK);
- if (linkMap != null && linkMap.containsKey(X_SUBSCRIBE))
- {
- Map x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
-
- if (x_subscribe.containsKey(ARGUMENTS))
+ if (linkMap != null)
+ {
+ Map x_subscribe = null;
+
+ if(linkMap.containsKey(X_SUBSCRIBE))
{
- link.getSubscription().setArgs((Map<String,Object>)x_subscribe.get(ARGUMENTS));
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE);
+ }
+ else if(linkMap.containsKey(X_SUBSCRIBES))
+ {
+ // left in for backwards compatibility with old broken constant
+ x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES);
+ }
+
+ if(x_subscribe != null)
+ {
+ if (x_subscribe.containsKey(ARGUMENTS))
+ {
+ link.getSubscription().setArgs((Map<String, Object>) x_subscribe.get(ARGUMENTS));
+ }
+
+ boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
+ Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false;
+
+ link.getSubscription().setExclusive(exclusive);
}
-
- boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ?
- Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false;
-
- link.getSubscription().setExclusive(exclusive);
}
link.setBindings(getBindings(linkMap));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org