You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC
svn commit: r1565726 [5/6] - in /qpid/trunk/qpid/java: ./
amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Fri Feb 7 16:57:49 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMa
*@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+ boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
void visitComplete();
}
void visit(Visitor visitor) throws AMQException;
- void add(long deliveryTag, QueueEntry message);
+ void add(long deliveryTag, MessageInstance message);
- QueueEntry remove(long deliveryTag);
+ MessageInstance remove(long deliveryTag);
- Collection<QueueEntry> cancelAllMessages();
+ Collection<MessageInstance> cancelAllMessages();
int size();
void clear();
- QueueEntry get(long deliveryTag);
+ MessageInstance get(long deliveryTag);
/**
* Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMa
*/
Set<Long> getDeliveryTags();
- Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+ Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Feb 7 16:57:49 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImp
private long _unackedSize;
- private Map<Long, QueueEntry> _map;
+ private Map<Long, MessageInstance> _map;
private long _lastDeliveryTag;
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImp
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+ _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
{
if (multiple)
{
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImp
}
else
{
- final QueueEntry entry = get(deliveryTag);
+ final MessageInstance entry = get(deliveryTag);
if(entry != null)
{
msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImp
}
- public void remove(Map<Long,QueueEntry> msgs)
+ public void remove(Map<Long,MessageInstance> msgs)
{
synchronized (_lock)
{
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImp
}
}
- public QueueEntry remove(long deliveryTag)
+ public MessageInstance remove(long deliveryTag)
{
synchronized (_lock)
{
- QueueEntry message = _map.remove(deliveryTag);
+ MessageInstance message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImp
{
synchronized (_lock)
{
- Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+ Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, MessageInstance> entry : currentEntries)
{
visitor.callback(entry.getKey().longValue(), entry.getValue());
}
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImp
}
}
- public void add(long deliveryTag, QueueEntry message)
+ public void add(long deliveryTag, MessageInstance message)
{
synchronized (_lock)
{
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImp
}
}
- public Collection<QueueEntry> cancelAllMessages()
+ public Collection<MessageInstance> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<QueueEntry> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+ Collection<MessageInstance> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImp
}
}
- public QueueEntry get(long key)
+ public MessageInstance get(long key)
{
synchronized (_lock)
{
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImp
}
}
- public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
return ackedMessageMap.values();
}
- private void collect(long key, Map<Long, QueueEntry> msgs)
+ private void collect(long key, Map<Long, MessageInstance> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
msgs.put(entry.getKey(),entry.getValue());
if (entry.getKey() == key)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler i
if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ queue,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
@@ -156,14 +160,14 @@ public class BasicConsumeMethodHandler i
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (AMQQueue.ExistingExclusiveConsumer e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.EnumSet;
+
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -128,7 +132,7 @@ public class BasicGetMethodHandler imple
{
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ public void deliverToClient(final Consumer sub, final ServerMessage message, final
InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -145,25 +149,32 @@ public class BasicGetMethodHandler imple
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
- Subscription sub;
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES);
if(acks)
{
- sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- queue.registerSubscription(sub,false);
- queue.flushSubscription(sub);
- queue.unregisterSubscription(sub);
+ Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
return(!singleMessageCredit.hasCredit());
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchange(exchangeName.toString());
+ MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler i
throw body.getChannelNotFoundException(channelId);
}
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandl
throw body.getChannelNotFoundException(channelId);
}
channel.sync();
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Fri Feb 7 16:57:49 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler im
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
if (message == null)
{
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler im
}
else
{
- if (message.isQueueDeleted())
- {
- _logger.warn("Message's Queue has already been purged, dropping message");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
- {
- message.delete();
- }
- return;
- }
if (message.getMessage() == null)
{
@@ -98,41 +89,43 @@ public class BasicRejectMethodHandler im
" on channel:" + channel.debugIdentity());
}
- message.reject();
-
if (body.getRequeue())
{
- channel.requeue(deliveryTag);
-
//this requeue represents a message rejected from the pre-dispatch queue
//therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
}
else
{
- final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
- _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
- if (maxDeliveryCountEnabled)
- {
- final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
- _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
- if (deliveredTooManyTimes)
- {
- channel.deadLetter(body.getDeliveryTag());
- }
- else
- {
- //this requeue represents a message rejected because of a recover/rollback that we
- //are not ready to DLQ. We rely on the reject command to resend from the unacked map
- //and therefore need to increment the delivery counter so we cancel out the effect
- //of the AMQChannel#resend() decrement.
- message.incrementDeliveryCount();
- }
- }
- else
- {
- channel.requeue(deliveryTag);
- }
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
}
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Fri Feb 7 16:57:49 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler impleme
}
};
protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.addQueueDeleteTask(new Action<AMQQueue>() {
+ public void performAction(AMQQueue queue)
{
protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
@@ -245,9 +246,9 @@ public class QueueDeclareHandler impleme
session.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
session.removeSessionCloseTask(deleteQueueTask);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java Fri Feb 7 16:57:49 2014
@@ -74,7 +74,7 @@ public class TxRollbackHandler implement
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(false);
+ channel.resend();
}
catch (AMQException e)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Feb 7 16:57:49 2014
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Set;
/**
@@ -47,7 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
private void publishMessages(int count, boolean persistent) throws AMQException
{
- _queue.registerSubscription(_subscription,false);
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -144,7 +145,7 @@ public class AckTest extends QpidTestCas
try
{
- _queue.enqueue(message);
+ _queue.enqueue(message,null);
}
catch (AMQException e)
{
@@ -178,7 +179,13 @@ public class AckTest extends QpidTestCas
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -190,8 +197,8 @@ public class AckTest extends QpidTestCas
{
assertTrue(deliveryTag == i);
i++;
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
}
}
@@ -202,7 +209,16 @@ public class AckTest extends QpidTestCas
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +234,13 @@ public class AckTest extends QpidTestCas
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -235,7 +257,15 @@ public class AckTest extends QpidTestCas
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -248,8 +278,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -264,7 +294,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -279,8 +317,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -290,7 +328,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -303,8 +349,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -319,12 +365,16 @@ public class AckTest extends QpidTestCas
// Send 10 messages
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 1;
publishMessages(msgCount);
- _queue.deliverAsync(_subscription);
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Feb 7 16:57:49 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
getQueue().deliverAsync();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Fri Feb 7 16:57:49 2014
@@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
@@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
- private MessageStore _messageStore = new TestMemoryMessageStore();
- private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private AMQQueue _queue;
+ private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
+ private Consumer _consumer;
+ private boolean _queueDeleted;
@Override
public void setUp() throws AMQException
{
+ _queueDeleted = false;
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn(getName());
+ when(_queue.isDeleted()).thenReturn(_queueDeleted);
+ _consumer = mock(Consumer.class);
+ when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+
long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(id);
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(msg);
+ when(entry.getQueue()).thenReturn(_queue);
+ when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(entry.isDeleted()).thenReturn(true);
+ return null;
+ }
+ }).when(entry).delete();
+ _unacknowledgedMessageMap.add(id, entry);
+ _referenceList.add(entry);
//Increment ID;
id++;
}
- // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
@@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest
*
* @return Subscription that performed the acquire
*/
- private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<MessageInstance> messageList)
{
- Subscription subscription = new MockSubscription();
- // Aquire messages in subscription
- for (QueueEntry entry : messageList)
+ // Acquire messages in subscription
+ for(MessageInstance entry : messageList)
{
- entry.acquire(subscription);
+ when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}
-
- return subscription;
}
/**
@@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest
public void testResend() throws AMQException
{
//We don't need the subscription object here.
- createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
- Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
// Close subscription
- subscription.close();
+ when(_consumer.isClosed()).thenReturn(true);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
- * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = true so all messages should go to msgToRequeue
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
- * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testDrop() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
-
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
- * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
- * future we may wish to dead letter the message.
- *
- * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
- *
- * @throws AMQException the visit interface throws this
- */
- public void testDiscard() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- _queue.delete();
-
- // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
- }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Feb 7 16:57:49 2014
@@ -47,11 +47,9 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -60,7 +58,7 @@ public class InternalTestProtocolSession
{
private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
// ChannelID(LIST) -> LinkedList<Pair>
- private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -68,7 +66,7 @@ public class InternalTestProtocolSession
{
super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
- _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+ _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
setTestAuthorizedSubject();
setVirtualHost(virtualHost);
@@ -117,7 +115,7 @@ public class InternalTestProtocolSession
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
if (all == null)
{
@@ -153,23 +151,23 @@ public class InternalTestProtocolSession
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(consumerTag, consumerDelivers);
+ consumers.put(consumerTag.toString(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
}
}
@@ -247,27 +245,27 @@ public class InternalTestProtocolSession
@Override
- public void deliverToClient(Subscription sub, ServerMessage message,
+ public void deliverToClient(Consumer sub, ServerMessage message,
InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(_channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+ consumers.put(sub.getName(), consumerDelivers);
}
consumerDelivers.add(new DeliveryPair(deliveryTag, message));
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Fri Feb 7 16:57:49 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -130,8 +130,7 @@ public class QueueBrowserUsesNoAckTest e
//Check the process didn't suspend the subscription as this would
// indicate we are using the prefetch credit. i.e. using acks not No-Ack
assertTrue("The subscription has been suspended",
- !getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
+ !getChannel().getSubscription(browser).isSuspended());
}
private void checkStoreContents(int messageCount)
@@ -144,6 +143,6 @@ public class QueueBrowserUsesNoAckTest e
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ return channel.consumeFromSource(null, queue, true, filters, true);
}
}
Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
('svn:mergeinfo' removed)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Feb 7 16:57:49 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements C
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
-
-
- public static interface Task
- {
- public void doTask(Connection_1_0 connection);
- }
-
-
- private List<Task> _closeTasks =
- Collections.synchronizedList(new ArrayList<Task>());
+ private List<Action<Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
@@ -98,26 +91,26 @@ public class Connection_1_0 implements C
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Task task)
+ void removeConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Task task)
+ void addConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Task> taskCopy;
+ List<Action<Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Task>(_closeTasks);
+ taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
}
- for(Task task : taskCopy)
+ for(Action<Connection_1_0> task : taskCopy)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized (_closeTasks)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Feb 7 16:57:49 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
@@ -286,7 +285,7 @@ public abstract class MessageConverter_t
Binary dataEncoding = sectionEncoder.getEncoding();
final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(0,allData);
+ metaData.writeToBuffer(allData);
allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
return allData;
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Feb 7 16:57:49 2014
@@ -314,7 +314,7 @@ public class MessageMetaData_1_0 impleme
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -326,7 +326,7 @@ public class MessageMetaData_1_0 impleme
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
('svn:mergeinfo' removed)
Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
('svn:mergeinfo' removed)
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Fri Feb 7 16:57:49 2014
@@ -24,22 +24,21 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
{
private static final Logger _logger = Logger.getLogger(QueueDestination.class);
private static final Accepted ACCEPTED = new Accepted();
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
- private AMQQueue _queue;
-
public QueueDestination(AMQQueue queue)
{
- _queue = queue;
+ super(queue);
}
public Outcome[] getOutcomes()
@@ -52,7 +51,7 @@ public class QueueDestination implements
try
{
- txn.enqueue(_queue,message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
@@ -60,8 +59,7 @@ public class QueueDestination implements
{
try
{
-
- _queue.enqueue(message);
+ getQueue().enqueue(message,null);
}
catch (Exception e)
{
@@ -93,7 +91,7 @@ public class QueueDestination implements
public AMQQueue getQueue()
{
- return _queue;
+ return (AMQQueue) super.getQueue();
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Feb 7 16:57:49 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,11 +65,14 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -78,18 +82,22 @@ public class SendingLink_1_0 implements
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription_1_0 _subscription;
+ private Consumer _consumer;
+ private ConsumerTarget_1_0 _target;
+
private boolean _draining;
- private final Map<Binary, QueueEntry> _unsettledMap =
- new HashMap<Binary, QueueEntry>();
+ private final Map<Binary, MessageInstance> _unsettledMap =
+ new HashMap<Binary, MessageInstance>();
private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
new ConcurrentHashMap<Binary, UnsettledAction>();
private volatile SendingLinkAttachment _linkAttachment;
private TerminusDurability _durability;
- private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
+ private final MessageSource _queue;
+
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
final VirtualHost vhost,
@@ -103,24 +111,22 @@ public class SendingLink_1_0 implements
_durability = source.getDurable();
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- AMQQueue queue = null;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
boolean noLocal = false;
JMSSelectorFilter messageFilter = null;
- if(destination instanceof QueueDestination)
+ if(destination instanceof MessageSourceDestination)
{
- queue = ((QueueDestination) _destination).getQueue();
+ _queue = ((MessageSourceDestination) _destination).getQueue();
- if(queue.getAvailableAttributes().contains("topic"))
+ if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
- qd = (QueueDestination) destination;
-
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -167,7 +173,13 @@ public class SendingLink_1_0 implements
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+ _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+
}
else if(destination instanceof ExchangeDestination)
{
@@ -199,7 +211,7 @@ public class SendingLink_1_0 implements
name = UUID.randomUUID().toString();
}
- queue = _vhost.getQueue(name);
+ AMQQueue queue = _vhost.getQueue(name);
Exchange exchange = exchangeDestination.getExchange();
if(queue == null)
@@ -299,9 +311,10 @@ public class SendingLink_1_0 implements
}
}
}
+ _queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- exchange.addBinding(binding,queue,null);
+ exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
@@ -309,10 +322,10 @@ public class SendingLink_1_0 implements
final String queueName = name;
final AMQQueue tempQueue = queue;
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -331,9 +344,9 @@ public class SendingLink_1_0 implements
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
}
@@ -347,31 +360,46 @@ public class SendingLink_1_0 implements
catch (AMQSecurityException e)
{
_logger.error("Security error", e);
+ throw new RuntimeException(e);
}
catch (AMQInternalException e)
{
_logger.error("Internal error", e);
+ throw new RuntimeException(e);
}
catch (AMQException e)
{
_logger.error("Error", e);
+ throw new RuntimeException(e);
}
- _subscription = new Subscription_1_0(this, qd, true);
+
+ _target = new ConsumerTarget_1_0(this, true);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+
+ }
+ else
+ {
+ throw new RuntimeException("Unknown destination type");
}
- if(_subscription != null)
+ if(_target != null)
{
- _subscription.setNoLocal(noLocal);
- if(messageFilter!=null)
+ if(noLocal)
{
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ options.add(Consumer.Option.NO_LOCAL);
}
+
+ _consumer.setNoLocal(noLocal);
+
+
try
{
-
- queue.registerSubscription(_subscription, false);
+ _consumer = _queue.addConsumer(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
@@ -394,12 +422,11 @@ public class SendingLink_1_0 implements
// if not durable or close
if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
- AMQQueue queue = _subscription.getQueue();
try
{
- queue.unregisterSubscription(_subscription);
+ _consumer.close();
}
catch (AMQException e)
@@ -426,7 +453,7 @@ public class SendingLink_1_0 implements
{
try
{
- queue.getVirtualHost().removeQueue(queue);
+ _vhost.removeQueue((AMQQueue)_queue);
}
catch(AMQException e)
{
@@ -443,7 +470,7 @@ public class SendingLink_1_0 implements
else if(detach == null || detach.getError() != null)
{
_linkAttachment = null;
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
else
{
@@ -491,7 +518,7 @@ public class SendingLink_1_0 implements
}
if(_resumeAcceptedTransfers.isEmpty())
{
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
}
@@ -531,7 +558,7 @@ public class SendingLink_1_0 implements
}
}
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
{
_unsettledActionMap.put(tag,unsettledAction);
if(getTransactionId() == null)
@@ -593,9 +620,9 @@ public class SendingLink_1_0 implements
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
- if(_subscription.isActive())
+ if(_consumer.isActive())
{
- _subscription.suspend();
+ _target.suspend();
}
_linkAttachment = linkAttachment;
@@ -603,14 +630,14 @@ public class SendingLink_1_0 implements
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
endpoint.setDeliveryStateHandler(this);
Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
- for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final QueueEntry queueEntry = entry.getValue();
+ final MessageInstance queueEntry = entry.getValue();
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
@@ -624,7 +651,7 @@ public class SendingLink_1_0 implements
if(outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -644,7 +671,7 @@ public class SendingLink_1_0 implements
else if(outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -678,9 +705,9 @@ public class SendingLink_1_0 implements
public Map getUnsettledOutcomeMap()
{
- Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
- for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
{
entry.setValue(null);
}
@@ -692,4 +719,9 @@ public class SendingLink_1_0 implements
{
_closeAction = action;
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return _vhost;
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Feb 7 16:57:49 2014
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.L
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
@@ -108,11 +111,11 @@ public class Session_1_0 implements Sess
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueue(addr);
+ MessageSource queue = _vhost.getMessageSource(addr);
if(queue != null)
{
- destination = new QueueDestination(queue);
+ destination = new MessageSourceDestination(queue);
@@ -249,11 +252,11 @@ public class Session_1_0 implements Sess
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
+ MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+ if(messageDestination != null)
{
- destination = new ExchangeDestination(exchg, target.getDurable(),
- target.getExpiryPolicy());
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy());
}
else
{
@@ -343,10 +346,10 @@ public class Session_1_0 implements Sess
if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
{
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -365,9 +368,9 @@ public class Session_1_0 implements Sess
_connection.addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
_connection.removeConnectionCloseTask(deleteQueueTask);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org