You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/10 12:32:18 UTC
svn commit: r1566585 - in /qpid/branches/java-broker-amqp-1-0-management: ./
java/ java/broker-core/
java/broker-core/src/main/java/org/apache/qpid/server/filter/
java/broker-core/src/main/java/org/apache/qpid/server/queue/
java/broker-plugins/amqp-0-1...
Author: rgodfrey
Date: Mon Feb 10 11:32:18 2014
New Revision: 1566585
URL: http://svn.apache.org/r1566585
Log:
merge from trunk
Modified:
qpid/branches/java-broker-amqp-1-0-management/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/ (props changed)
qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes
Propchange: qpid/branches/java-broker-amqp-1-0-management/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid:r1566479-1566480,1566531,1566535,1566543,1566579
Propchange: qpid/branches/java-broker-amqp-1-0-management/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1566479,1566531,1566535,1566543,1566579
Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-core:r1566479,1566535
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Mon Feb 10 11:32:18 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -118,11 +119,11 @@ public class FilterSupport
}
}
- static final class NoLocalFilter implements MessageFilter
+ public static final class NoLocalFilter implements MessageFilter
{
- private final AMQQueue _queue;
+ private final MessageSource _queue;
- public NoLocalFilter(AMQQueue queue)
+ public NoLocalFilter(MessageSource queue)
{
_queue = queue;
}
Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1566479
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Feb 10 11:32:18 2014
@@ -107,7 +107,11 @@ public class ConsumerTarget_0_10 extends
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ final Consumer consumer = getConsumer();
+ if(consumer != null)
+ {
+ consumer.getSendLock();
+ }
try
{
while(!closed && state != State.CLOSED)
@@ -122,7 +126,10 @@ public class ConsumerTarget_0_10 extends
}
finally
{
- getConsumer().releaseSendLock();
+ if(consumer != null)
+ {
+ consumer.releaseSendLock();
+ }
}
return closed;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Feb 10 11:32:18 2014
@@ -44,7 +44,10 @@ import org.apache.qpid.server.Transactio
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -512,7 +515,7 @@ public class AMQChannel implements AMQSe
* @throws AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
- FieldTable filters, boolean exclusive) throws AMQException
+ FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
{
if (tag == null)
{
@@ -549,6 +552,7 @@ public class AMQChannel implements AMQSe
options.add(Consumer.Option.EXCLUSIVE);
}
+
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
@@ -558,9 +562,18 @@ public class AMQChannel implements AMQSe
try
{
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ if(noLocal)
+ {
+ if(filterManager == null)
+ {
+ filterManager = new SimpleFilterManager();
+ }
+ filterManager.add(new FilterSupport.NoLocalFilter(source));
+ }
Consumer sub =
source.addConsumer(target,
- FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ filterManager,
AMQMessage.class,
AMQShortString.toString(tag),
options);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Feb 10 11:32:18 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
+ public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
+ final AMQShortString consumerTag,
+ final FieldTable filters,
+ final FlowCreditManager creditManager,
+ final ClientDeliveryMethod deliveryMethod,
+ final RecordDeliveryMethod recordMethod) throws AMQException
+ {
+ return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ }
+
static final class BrowserConsumer extends ConsumerTarget_0_8
{
public BrowserConsumer(AMQChannel channel,
@@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
@@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8
*/
public static final class GetNoAckConsumer extends NoAckConsumer
{
- public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ public GetNoAckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
+ FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
throws AMQException
@@ -417,7 +428,12 @@ public abstract class ConsumerTarget_0_8
boolean closed = false;
State state = getState();
- getConsumer().getSendLock();
+ final Consumer consumer = getConsumer();
+
+ if(consumer != null)
+ {
+ consumer.getSendLock();
+ }
try
{
while(!closed && state != State.CLOSED)
@@ -433,7 +449,10 @@ public abstract class ConsumerTarget_0_8
}
finally
{
- getConsumer().releaseSendLock();
+ if(consumer != null)
+ {
+ consumer.releaseSendLock();
+ }
}
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Mon Feb 10 11:32:18 2014
@@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler i
queue,
!body.getNoAck(),
body.getArguments(),
- body.getExclusive());
+ body.getExclusive(),
+ body.getNoLocal());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb 10 11:32:18 2014
@@ -128,24 +128,8 @@ public class BasicGetMethodHandler imple
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
- final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
- {
-
- @Override
- public void deliverToClient(final Consumer sub, final ServerMessage message, final
- InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- singleMessageCredit.useCreditForMessage(message.getSize());
- session.getProtocolOutputConverter().writeGetOk(message,
- props,
- channel.getChannelId(),
- deliveryTag,
- queue.getMessageCount());
-
-
- }
- };
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
@@ -167,7 +151,7 @@ public class BasicGetMethodHandler imple
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
AMQShortString.EMPTY_STRING, null,
singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
@@ -175,10 +159,48 @@ public class BasicGetMethodHandler imple
Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
- return(!singleMessageCredit.hasCredit());
+ return(getDeliveryMethod.hasDeliveredMessage());
}
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag) throws AMQException
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ _session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getMessageCount());
+
+ _deliveredMessage = true;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Mon Feb 10 11:32:18 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
getQueue().deliverAsync();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Mon Feb 10 11:32:18 2014
@@ -141,6 +141,6 @@ public class QueueBrowserUsesNoAckTest e
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.consumeFromSource(null, queue, true, filters, true);
+ return channel.consumeFromSource(null, queue, true, filters, true, false);
}
}
Propchange: qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/test-profiles:r1566579
Modified: qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes Mon Feb 10 11:32:18 2014
@@ -50,6 +50,9 @@ qpid_tests.broker_0_10.priority.Priority
#The broker does not support the autodelete property on exchanges
qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete*
+# QPID-5531 : Changes to the C++ behaviour in having a default timeout for every transaction not implemented in Java Broker
+qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
+
###### Behavioural differences between Java & CPP Broker ######
#Tests changed/added in QPID-5280 and QPID-5283
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org