You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/03/30 18:02:46 UTC

svn commit: r1087001 - in /qpid/branches/0.5.x-dev/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/j...

Author: robbie
Date: Wed Mar 30 16:02:45 2011
New Revision: 1087001

URL: http://svn.apache.org/viewvc?rev=1087001&view=rev
Log:
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery when subscriptions with unique selectors are in use

Added:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
    qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Wed Mar 30 16:02:45 2011
@@ -210,9 +210,9 @@ public class DiagnosticExchange extends 
         Long value = new Long(SizeOf.getUsedMemory());
         AMQShortString key = new AMQShortString("memory");
         
-        FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+        FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).getHeaders();
         headers.put(key, value);
-        ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+        ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
 
         ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Mar 30 16:02:45 2011
@@ -224,7 +224,7 @@ public class AMQChannel
             finally
             {
 	            long bodySize = _currentMessage.getContentHeaderBody().bodySize;
-	            long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+	            long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().getProperties()).getTimestamp();
 	            _session.registerMessageReceived(bodySize, timestamp);
                 // callback to allow the context to do any post message processing
                 // primary use is to allow message return processing in the non-tx case

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Mar 30 16:02:45 2011
@@ -351,7 +351,7 @@ public class HeadersExchange extends Abs
     {
         //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
         //but these are not yet implemented.
-        return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+        return ((BasicContentHeaderProperties) contentHeaderFrame.getProperties()).getHeaders();
     }
 
     protected ExchangeMBean createMBean() throws AMQException

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Wed Mar 30 16:02:45 2011
@@ -110,7 +110,7 @@ public class PropertyExpression<E extend
         {
 
             CommonContentHeaderProperties _properties =
-                (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+                (CommonContentHeaderProperties) message.getContentHeaderBody().getProperties();
 
             if (_logger.isDebugEnabled())
             {
@@ -165,7 +165,7 @@ public class PropertyExpression<E extend
 
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
             AMQShortString replyTo = _properties.getReplyTo();
 
             return (replyTo == null) ? null : replyTo.toString();
@@ -180,7 +180,7 @@ public class PropertyExpression<E extend
         {
                 CommonContentHeaderProperties _properties =
                     (CommonContentHeaderProperties)
-                        message.getContentHeaderBody().properties;
+                        message.getContentHeaderBody().getProperties();
                 AMQShortString type = _properties.getType();
 
                 return (type == null) ? null : type.toString();
@@ -208,7 +208,7 @@ public class PropertyExpression<E extend
         {
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
 
             return (int) _properties.getPriority();
         }
@@ -221,7 +221,7 @@ public class PropertyExpression<E extend
 
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
             AMQShortString messageId = _properties.getMessageId();
 
             return (messageId == null) ? null : messageId;
@@ -235,7 +235,7 @@ public class PropertyExpression<E extend
         {
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
 
             return _properties.getTimestamp();
         }
@@ -247,7 +247,7 @@ public class PropertyExpression<E extend
         {
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
             AMQShortString correlationId = _properties.getCorrelationId();
 
             return (correlationId == null) ? null : correlationId.toString();
@@ -261,7 +261,7 @@ public class PropertyExpression<E extend
 
             CommonContentHeaderProperties _properties =
                 (CommonContentHeaderProperties)
-                    message.getContentHeaderBody().properties;
+                    message.getContentHeaderBody().getProperties();
 
             return _properties.getExpiration();
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Mar 30 16:02:45 2011
@@ -410,7 +410,7 @@ public class AMQQueueMBean extends AMQMa
         {
             // Create header attributes list
             CommonContentHeaderProperties headerProperties =
-                (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+                (CommonContentHeaderProperties) msg.getContentHeaderBody().getProperties();
             String mimeType = null, encoding = null;
             if (headerProperties != null)
             {
@@ -493,7 +493,7 @@ public class AMQQueueMBean extends AMQMa
     private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
     {
         List<String> list = new ArrayList<String>();
-        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+        BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
         list.add("reply-to = " + headerProperties.getReplyToAsString());
         list.add("propertyFlags = " + headerProperties.getPropertyFlags());
         list.add("ApplicationID = " + headerProperties.getAppIdAsString());

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Mar 30 16:02:45 2011
@@ -59,7 +59,7 @@ public class ConflationQueueList extends
 
         try
         {
-            Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey);            
+            Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().getProperties()).getHeaders().get(_conflationKey);
             if(value != null)
             {
                 latestValueReference = _latestValuesMap.get(value);

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Wed Mar 30 16:02:45 2011
@@ -94,9 +94,9 @@ public class IncomingMessage implements 
     public void setExpiration()
     {
             long expiration =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
             long timestamp =
-                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+                    ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
 
             if (SYNCHED_CLOCKS)
             {
@@ -176,8 +176,8 @@ public class IncomingMessage implements 
             // now that it has all been received, before we attempt delivery
             _txnContext.messageFullyReceived(isPersistent());
             
-            AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
-                     ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; 
+            AMQShortString userID = getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties ?
+                     ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getUserId() : null;
             
             if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
             {
@@ -280,8 +280,8 @@ public class IncomingMessage implements 
 
     public boolean isPersistent()
     {
-        return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 
+        return getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getDeliveryMode() ==
                                                              BasicContentHeaderProperties.PERSISTENT;
     }
     

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Mar 30 16:02:45 2011
@@ -57,7 +57,7 @@ public class PriorityQueueList implement
     {
         try
         {
-            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().getProperties()))).getPriority() - _priorityOffset;
             if(index >= _priorities)
             {
                 index = _priorities-1;

Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1087001&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Mar 30 16:02:45 2011
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+
+public class QueueRunner implements ReadWriteRunnable
+{
+    private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+    private String _name;
+    private SimpleAMQQueue _queue;
+
+    public QueueRunner(SimpleAMQQueue queue, long count)
+    {
+        _queue = queue;
+        _name = "QueueRunner-" + count + "-" + _queue.getLogActor();
+    }
+
+    public void run()
+    {
+        String originalName = Thread.currentThread().getName();
+        try
+        {
+            Thread.currentThread().setName(_name);
+            CurrentActor.set(_queue.getLogActor());
+
+            _queue.processQueue(this);
+        }
+        catch (AMQException e)
+        {
+            _logger.error(e);
+        }
+        finally
+        {
+            CurrentActor.remove();
+            Thread.currentThread().setName(originalName);
+        }
+    }
+
+    public boolean isRead()
+    {
+        return false;
+    }
+
+    public boolean isWrite()
+    {
+        return true;
+    }
+
+    public String toString()
+    {
+        return _name;
+    }
+}
\ No newline at end of file

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Mar 30 16:02:45 2011
@@ -1230,10 +1230,9 @@ public class SimpleAMQQueue implements A
         }
     }
 
-
     public void deliverAsync()
     {
-        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+        QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1246,52 +1245,6 @@ public class SimpleAMQQueue implements A
         _asyncDelivery.execute(new SubFlushRunner(sub));
     }
 
-
-    private class Runner implements ReadWriteRunnable
-    {
-        String _name;
-        public Runner(long count)
-        {
-            _name = "QueueRunner-" + count + "-" + _logActor;
-        }
-
-        public void run()
-        {
-            String originalName = Thread.currentThread().getName();
-            try
-            {
-                Thread.currentThread().setName(_name);
-                CurrentActor.set(_logActor);
-
-                processQueue(this);
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
-            }
-            finally
-            {
-                CurrentActor.remove();
-                Thread.currentThread().setName(originalName);
-            }
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-
-        public String toString()
-        {
-            return _name;
-        }
-    }
-
     private class SubFlushRunner implements ReadWriteRunnable
     {
         private final Subscription _sub;
@@ -1529,7 +1482,7 @@ public class SimpleAMQQueue implements A
      * @param runner the Runner to schedule
      * @throws AMQException
      */
-    private void processQueue(Runnable runner) throws AMQException
+    public void processQueue(QueueRunner runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
@@ -1897,4 +1850,9 @@ public class SimpleAMQQueue implements A
             throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
         }
     }
+
+    public LogActor getLogActor()
+    {
+        return _logActor;
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Wed Mar 30 16:02:45 2011
@@ -120,8 +120,8 @@ public class TransientMessageData
 
     public boolean isPersistent()
     {
-        return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 
+        return _contentHeaderBody.getProperties() instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getDeliveryMode() ==
                                                          BasicContentHeaderProperties.PERSISTENT;
     }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Wed Mar 30 16:02:45 2011
@@ -361,7 +361,7 @@ public class Show extends AbstractComman
 
             try
             {
-                headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+                headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().getProperties());
             }
             catch (AMQException e)
             {

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Wed Mar 30 16:02:45 2011
@@ -227,7 +227,7 @@ public class AbstractHeadersExchangeTest
     static ContentHeaderBody getContentHeader(FieldTable headers)
     {
         ContentHeaderBody header = new ContentHeaderBody();
-        header.properties = getProperties(headers);
+        header.setProperties(getProperties(headers));
         return header;
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Wed Mar 30 16:02:45 2011
@@ -95,7 +95,7 @@ public class AMQPriorityQueueTest extend
         AMQMessage msg = super.createMessage(id);
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
         props.setPriority(i);
-        msg.getContentHeaderBody().properties = props;
+        msg.getContentHeaderBody().setProperties(props);
         return msg;
     }
     

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Mar 30 16:02:45 2011
@@ -448,8 +448,8 @@ public class AMQQueueMBeanTest extends T
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
         contentHeaderBody.bodySize = MESSAGE_SIZE;   // in bytes
-        contentHeaderBody.properties = new BasicContentHeaderProperties();
-        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+        contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+        ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
         IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext,  _protocolSession);
         msg.setContentHeaderBody(contentHeaderBody);
         return msg;

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Wed Mar 30 16:02:45 2011
@@ -144,7 +144,7 @@ public class AckTest extends TestCase
                 //This is DeliveryMode.PERSISTENT
                 b.setDeliveryMode((byte) 2);
                 ContentHeaderBody cb = new ContentHeaderBody();
-                cb.properties = b;
+                cb.setProperties(b);
                 msg.setContentHeaderBody(cb);
             }
             else

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Mar 30 16:02:45 2011
@@ -31,19 +31,15 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -405,8 +401,8 @@ public class SimpleAMQQueueTest extends 
         NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
         IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
-        contentHeaderBody.properties = new BasicContentHeaderProperties();
-        ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+        contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+        ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
         msg.setContentHeaderBody(contentHeaderBody);
         ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
         
@@ -432,6 +428,112 @@ public class SimpleAMQQueueTest extends 
         assertNull(data);
     }
 
+    /**
+     * processQueue() is used when asynchronously delivering messages to
+     * subscriptions which could not be delivered immediately during the
+     * enqueue() operation.
+     *
+     * A defect within the method would mean that delivery of these messages may
+     * not occur should the Runner stop before all messages have been processed.
+     * Such a defect was discovered when Selectors were used such that one and
+     * only one subscription can/will accept any given messages, but multiple
+     * subscriptions are present, and one of the earlier subscriptions receives
+     * more messages than the others.
+     *
+     * This test is to validate that the processQueue() method is able to
+     * correctly deliver all of the messages present for asynchronous delivery
+     * to subscriptions in such a scenario.
+     */
+    public void testProcessQueueWithUniqueSelectors() throws Exception
+    {
+        StoreContext testContext = new StoreContext();
+        TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+        SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString(
+                "testQueue"), false, new AMQShortString("testOwner"), false,
+                _virtualHost, factory)
+        {
+            @Override
+            public void deliverAsync(Subscription sub)
+            {
+                // do nothing, i.e prevent deliveries by the SubFlushRunner
+                // when registering the new subscriptions
+            }
+        };
+
+        // retrieve the QueueEntryList the queue creates and insert the test
+        // messages, thus avoiding straight-through delivery attempts during
+        //enqueue() process.
+        QueueEntryList list = factory.getQueueEntryList();
+        assertNotNull("QueueEntryList should have been created", list);
+
+        QueueEntry msg1 = list.add(createMessage(1L), testContext);
+        QueueEntry msg2 = list.add(createMessage(2L), testContext);
+        QueueEntry msg3 = list.add(createMessage(3L), testContext);
+        QueueEntry msg4 = list.add(createMessage(4L), testContext);
+        QueueEntry msg5 = list.add(createMessage(5L), testContext);
+
+        // Create lists of the entries each subscription should be interested
+        // in.Bias over 50% of the messages to the first subscription so that
+        // the later subscriptions reject them and report being done before
+        // the first subscription as the processQueue method proceeds.
+        List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+        List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+        List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+        MockSubscription sub1 = new MockSubscription(msgListSub1);
+        MockSubscription sub2 = new MockSubscription(msgListSub2);
+        MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+        // register the subscriptions
+        testQueue.registerSubscription(sub1, false);
+        testQueue.registerSubscription(sub2, false);
+        testQueue.registerSubscription(sub3, false);
+
+        //check that no messages have been delivered to the
+        //subscriptions during registration
+        assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+        assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+        assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+        // call processQueue to deliver the messages
+        testQueue.processQueue(new QueueRunner(testQueue, 1)
+        {
+            @Override
+            public void run()
+            {
+                // we dont actually want/need this runner to do any work
+                // because we we are already doing it!
+            }
+        });
+
+        // check expected messages delivered to correct consumers
+        verifyRecievedMessages(msgListSub1, sub1.getMessages());
+        verifyRecievedMessages(msgListSub2, sub2.getMessages());
+        verifyRecievedMessages(msgListSub3, sub3.getMessages());
+    }
+
+    private List<QueueEntry> createEntriesList(QueueEntry... entries)
+    {
+        ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+        for (QueueEntry entry : entries)
+        {
+            entriesList.add(entry);
+        }
+        return entriesList;
+    }
+
+    private void verifyRecievedMessages(List<QueueEntry> expected,
+            List<QueueEntry> delivered)
+    {
+        assertEquals("Consumer did not receive the expected number of messages",
+                    expected.size(), delivered.size());
+
+        for (QueueEntry msg : expected)
+        {
+            assertTrue("Consumer did not recieve msg: "
+                    + msg.getMessage().getMessageId(), delivered.contains(msg));
+        }
+    }
 
     // FIXME: move this to somewhere useful
     private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
@@ -495,4 +597,20 @@ public class SimpleAMQQueueTest extends 
         AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
         return messageA;
     }
+
+    class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+    {
+        QueueEntryList _list;
+
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        {
+            _list = new SimpleQueueEntryList(queue);
+            return _list;
+        }
+
+        public QueueEntryList getQueueEntryList()
+        {
+            return _list;
+        }
+    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Mar 30 16:02:45 2011
@@ -366,7 +366,7 @@ public class MessageStoreTest extends Te
         headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
         headerBody.bodySize = 0;
 
-        headerBody.properties = properties;
+        headerBody.setProperties(properties);
 
         try
         {

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Wed Mar 30 16:02:45 2011
@@ -105,7 +105,7 @@ public class TestReferenceCounting exten
         ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
         bchp.setDeliveryMode((byte)2);
-        chb.properties = bchp;
+        chb.setProperties(bchp);
         return chb;
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Wed Mar 30 16:02:45 2011
@@ -25,12 +25,12 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +46,21 @@ public class MockSubscription implements
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
+    private List<QueueEntry> _acceptEntries = null;
 
     private static final AtomicLong idGenerator = new AtomicLong(0);
     // Create a simple ID that increments for ever new Subscription
     private final long _subscriptionID = idGenerator.getAndIncrement();
 
+    public MockSubscription()
+    {
+    }
+
+    public MockSubscription(List<QueueEntry> acceptEntries)
+    {
+        _acceptEntries = acceptEntries;
+    }
+
     public void close()
     {
         _closed = true;
@@ -101,8 +111,15 @@ public class MockSubscription implements
         _stateChangeLock.lock();
     }
 
-    public boolean hasInterest(QueueEntry msg)
+    public boolean hasInterest(QueueEntry entry)
     {
+        if(_acceptEntries != null)
+        {
+            //simulate selector behaviour, only signal
+            //interest in the dictated queue entries
+            return _acceptEntries.contains(entry);
+        }
+
         return true;
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Wed Mar 30 16:02:45 2011
@@ -193,7 +193,7 @@ public class InternalBrokerBaseCase exte
             //Make Message Persistent
             properties.setDeliveryMode((byte) 2);
 
-            _headerBody.properties = properties;
+            _headerBody.setProperties(properties);
 
             channel.publishContentHeader(_headerBody);
         }

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Mar 30 16:02:45 2011
@@ -99,7 +99,7 @@ public abstract class AbstractJMSMessage
         }
 
         AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-                                                                 (BasicContentHeaderProperties) contentHeader.properties,
+                                                                 (BasicContentHeaderProperties) contentHeader.getProperties(),
                                                                  exchange, routingKey);
 
         return createMessage(delegate, data);

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Mar 30 16:02:45 2011
@@ -103,7 +103,7 @@ public class MessageFactoryRegistry
                                             AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
             throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
 
         // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the message is a byte message.

Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java Wed Mar 30 16:02:45 2011
@@ -258,7 +258,7 @@ public class BasicDeliverTest
     static ContentHeaderBody createContentHeaderBody()
     {
         ContentHeaderBody body = new ContentHeaderBody();
-        body.properties = new BasicContentHeaderProperties();
+        body.setProperties(new BasicContentHeaderProperties());
         body.weight = 1;
         body.classId = 6;
         return body;

Modified: qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Mar 30 16:02:45 2011
@@ -36,7 +36,7 @@ public class ContentHeaderBody implement
     public long bodySize;
 
     /** must never be null */
-    public ContentHeaderProperties properties;
+    private ContentHeaderProperties properties;
 
     public ContentHeaderBody()
     {
@@ -128,4 +128,14 @@ public class ContentHeaderBody implement
     {
         return new AMQFrame(channelId, body);
     }
+
+    public ContentHeaderProperties getProperties()
+    {
+        return properties;
+    }
+
+    public void setProperties(ContentHeaderProperties props)
+    {
+        properties = props;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org