You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/10 12:32:18 UTC

svn commit: r1566585 - in /qpid/branches/java-broker-amqp-1-0-management: ./ java/ java/broker-core/ java/broker-core/src/main/java/org/apache/qpid/server/filter/ java/broker-core/src/main/java/org/apache/qpid/server/queue/ java/broker-plugins/amqp-0-1...

Author: rgodfrey
Date: Mon Feb 10 11:32:18 2014
New Revision: 1566585

URL: http://svn.apache.org/r1566585
Log:
merge from trunk

Modified:
    qpid/branches/java-broker-amqp-1-0-management/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/   (props changed)
    qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes

Propchange: qpid/branches/java-broker-amqp-1-0-management/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid:r1566479-1566480,1566531,1566535,1566543,1566579

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1566479,1566531,1566535,1566543,1566579

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core:r1566479,1566535

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Mon Feb 10 11:32:18 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 
@@ -118,11 +119,11 @@ public class FilterSupport
         }
     }
 
-    static final class NoLocalFilter implements MessageFilter
+    public static final class NoLocalFilter implements MessageFilter
     {
-        private final AMQQueue _queue;
+        private final MessageSource _queue;
 
-        public NoLocalFilter(AMQQueue queue)
+        public NoLocalFilter(MessageSource queue)
         {
             _queue = queue;
         }

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1566479

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Feb 10 11:32:18 2014
@@ -107,7 +107,11 @@ public class ConsumerTarget_0_10 extends
         boolean closed = false;
         State state = getState();
 
-        getConsumer().getSendLock();
+        final Consumer consumer = getConsumer();
+        if(consumer != null)
+        {
+            consumer.getSendLock();
+        }
         try
         {
             while(!closed && state != State.CLOSED)
@@ -122,7 +126,10 @@ public class ConsumerTarget_0_10 extends
             }
         finally
         {
-            getConsumer().releaseSendLock();
+            if(consumer != null)
+            {
+                consumer.releaseSendLock();
+            }
         }
 
         return closed;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Feb 10 11:32:18 2014
@@ -44,7 +44,10 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -512,7 +515,7 @@ public class AMQChannel implements AMQSe
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
-                                            FieldTable filters, boolean exclusive) throws AMQException
+                                            FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
     {
         if (tag == null)
         {
@@ -549,6 +552,7 @@ public class AMQChannel implements AMQSe
             options.add(Consumer.Option.EXCLUSIVE);
         }
 
+
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
         // We add before we register as the Async Delivery process may AutoClose the subscriber
         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
@@ -558,9 +562,18 @@ public class AMQChannel implements AMQSe
 
         try
         {
+            FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+            if(noLocal)
+            {
+                if(filterManager == null)
+                {
+                    filterManager = new SimpleFilterManager();
+                }
+                filterManager.add(new FilterSupport.NoLocalFilter(source));
+            }
             Consumer sub =
                     source.addConsumer(target,
-                                      FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+                                       filterManager,
                                       AMQMessage.class,
                                       AMQShortString.toString(tag),
                                       options);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Feb 10 11:32:18 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8
         return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
 
+    public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
+                                                          final AMQShortString consumerTag,
+                                                          final FieldTable filters,
+                                                          final FlowCreditManager creditManager,
+                                                          final ClientDeliveryMethod deliveryMethod,
+                                                          final RecordDeliveryMethod recordMethod) throws AMQException
+    {
+        return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+    }
+
     static final class BrowserConsumer extends ConsumerTarget_0_8
     {
         public BrowserConsumer(AMQChannel channel,
@@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
-                                                           AMQShortString consumerTag, FieldTable filters,
-                                                           FlowCreditManager creditManager,
-                                                           ClientDeliveryMethod deliveryMethod,
-                                                           RecordDeliveryMethod recordMethod) throws AMQException
+                                                       AMQShortString consumerTag, FieldTable filters,
+                                                       FlowCreditManager creditManager,
+                                                       ClientDeliveryMethod deliveryMethod,
+                                                       RecordDeliveryMethod recordMethod) throws AMQException
     {
         return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
@@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8
      */
     public static final class GetNoAckConsumer extends NoAckConsumer
     {
-        public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+        public GetNoAckConsumer(AMQChannel channel,
                                 AMQShortString consumerTag, FieldTable filters,
-                                boolean noLocal, FlowCreditManager creditManager,
+                                FlowCreditManager creditManager,
                                 ClientDeliveryMethod deliveryMethod,
                                 RecordDeliveryMethod recordMethod)
             throws AMQException
@@ -417,7 +428,12 @@ public abstract class ConsumerTarget_0_8
         boolean closed = false;
         State state = getState();
 
-        getConsumer().getSendLock();
+        final Consumer consumer = getConsumer();
+
+        if(consumer != null)
+        {
+            consumer.getSendLock();
+        }
         try
         {
             while(!closed && state != State.CLOSED)
@@ -433,7 +449,10 @@ public abstract class ConsumerTarget_0_8
         }
         finally
         {
-            getConsumer().releaseSendLock();
+            if(consumer != null)
+            {
+                consumer.releaseSendLock();
+            }
         }
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Mon Feb 10 11:32:18 2014
@@ -125,7 +125,8 @@ public class BasicConsumeMethodHandler i
                                                                                queue,
                                                                                !body.getNoAck(),
                                                                                body.getArguments(),
-                                                                               body.getExclusive());
+                                                                               body.getExclusive(),
+                                                                               body.getNoLocal());
                         if (!body.getNowait())
                         {
                             MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb 10 11:32:18 2014
@@ -128,24 +128,8 @@ public class BasicGetMethodHandler imple
 
         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
 
-        final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
-        {
-
-            @Override
-            public void deliverToClient(final Consumer sub, final ServerMessage message, final
-                                        InstanceProperties props, final long deliveryTag)
-            throws AMQException
-            {
-                singleMessageCredit.useCreditForMessage(message.getSize());
-                session.getProtocolOutputConverter().writeGetOk(message,
-                                                                props,
-                                                                channel.getChannelId(),
-                                                                deliveryTag,
-                                                                queue.getMessageCount());
-
-
-            }
-        };
+        final GetDeliveryMethod getDeliveryMethod =
+                new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
@@ -167,7 +151,7 @@ public class BasicGetMethodHandler imple
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(channel,
+            target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
                                                           AMQShortString.EMPTY_STRING, null,
                                                           singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
@@ -175,10 +159,48 @@ public class BasicGetMethodHandler imple
         Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
         sub.flush();
         sub.close();
-        return(!singleMessageCredit.hasCredit());
+        return(getDeliveryMethod.hasDeliveredMessage());
 
 
     }
 
 
+    private static class GetDeliveryMethod implements ClientDeliveryMethod
+    {
+
+        private final FlowCreditManager _singleMessageCredit;
+        private final AMQProtocolSession _session;
+        private final AMQChannel _channel;
+        private final AMQQueue _queue;
+        private boolean _deliveredMessage;
+
+        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+                                 final AMQProtocolSession session,
+                                 final AMQChannel channel, final AMQQueue queue)
+        {
+            _singleMessageCredit = singleMessageCredit;
+            _session = session;
+            _channel = channel;
+            _queue = queue;
+        }
+
+        @Override
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
+                                    final InstanceProperties props, final long deliveryTag) throws AMQException
+        {
+            _singleMessageCredit.useCreditForMessage(message.getSize());
+            _session.getProtocolOutputConverter().writeGetOk(message,
+                                                            props,
+                                                            _channel.getChannelId(),
+                                                            deliveryTag,
+                                                            _queue.getMessageCount());
+
+            _deliveredMessage = true;
+        }
+
+        public boolean hasDeliveredMessage()
+        {
+            return _deliveredMessage;
+        }
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Mon Feb 10 11:32:18 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
         assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
 
         //Subscribe to the queue
-        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
+        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
 
         getQueue().deliverAsync();
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Mon Feb 10 11:32:18 2014
@@ -141,6 +141,6 @@ public class QueueBrowserUsesNoAckTest e
         FieldTable filters = new FieldTable();
         filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
 
-        return channel.consumeFromSource(null, queue, true, filters, true);
+        return channel.consumeFromSource(null, queue, true, filters, true, false);
     }
 }

Propchange: qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/test-profiles:r1566579

Modified: qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes?rev=1566585&r1=1566584&r2=1566585&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/test-profiles/python_tests/Java010PythonExcludes Mon Feb 10 11:32:18 2014
@@ -50,6 +50,9 @@ qpid_tests.broker_0_10.priority.Priority
 #The broker does not support the autodelete property on exchanges
 qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete*
 
+# QPID-5531 : Changes to the C++ behaviour in having a default timeout for every transaction not implemented in Java Broker
+qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
+
 ###### Behavioural differences between Java & CPP Broker ######
 
 #Tests changed/added in QPID-5280 and QPID-5283



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org