You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/28 13:37:38 UTC

svn commit: r1698311 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ ...

Author: rgodfrey
Date: Fri Aug 28 11:37:38 2015
New Revision: 1698311

URL: http://svn.apache.org/r1698311
Log:
QPID-6713 : Materialize message content / properties when visiting messages on the queue, rather than retaining references to the entry / object

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
      - copied, changed from r1698286, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
    qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java (from r1698286, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java&r1=1698286&r2=1698311&rev=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java Fri Aug 28 11:37:38 2015
@@ -20,217 +20,199 @@
  */
 package org.apache.qpid.server.message;
 
-import java.util.AbstractMap;
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
-
-public class MessageInfoFacade implements MessageInfo
+public class MessageInfoImpl implements MessageInfo
 {
-    private final MessageInstance _instance;
-    private final boolean _includeHeaders;
+    private final String _deliveredTo;
+    private final long _arrivalTime;
+    private final boolean _persistent;
+    private final String _messageId;
+    private final Long _expirationTime;
+    private final String _applicationId;
+    private final String _correlationId;
+    private final String _encoding;
+    private final String _mimeType;
+    private final byte _priority;
+    private final String _replyTo;
+    private final Long _timestamp;
+    private final String _type;
+    private final String _userId;
+    private final String _state;
+    private final int _deliveryCount;
+    private final long _size;
+    private final long _id;
+    private final Map<String, Object> _headers;
+    private final String _initialRoutingAddress;
+
+    public MessageInfoImpl(final MessageInstance instance, final boolean includeHeaders)
+    {
+        final ServerMessage message = instance.getMessage();
+        final AMQMessageHeader messageHeader = message.getMessageHeader();
+
+        _deliveredTo = instance.getDeliveredConsumer() == null ? null : String.valueOf(instance.getDeliveredConsumer()
+                                                                                                .getConsumerNumber());
+        _arrivalTime = message.getArrivalTime();
+        _persistent = message.isPersistent();
+        _messageId = messageHeader.getMessageId();
+        _expirationTime = messageHeader.getExpiration() == 0l
+                ? null
+                : messageHeader.getExpiration();
+        _applicationId = messageHeader.getAppId();
+        _correlationId = messageHeader.getCorrelationId();
+        _encoding = messageHeader.getEncoding();
+        _mimeType = messageHeader.getMimeType();
+        _priority = messageHeader.getPriority();
+        _replyTo = messageHeader.getReplyTo();
+        _timestamp = messageHeader.getTimestamp() == 0l
+                ? null
+                : messageHeader.getTimestamp();
+        _type = messageHeader.getType();
+        _userId = messageHeader.getUserId();
+        _state = instance.isAvailable()
+                ? "Available"
+                : instance.isAcquired()
+                        ? "Acquired"
+                        : "";
+        _deliveryCount = instance.getDeliveryCount();
+        _size = message.getSize();
+        _id = message.getMessageNumber();
+        _initialRoutingAddress = message.getInitialRoutingAddress();
 
-    public MessageInfoFacade(final MessageInstance instance, final boolean includeHeaders)
-    {
-        _instance = instance;
-        _includeHeaders = includeHeaders;
+        if(includeHeaders)
+        {
+            Map<String,Object> headers = new LinkedHashMap<>();
+            for(String headerName : messageHeader.getHeaderNames())
+            {
+                headers.put(headerName, messageHeader.getHeader(headerName));
+            }
+            _headers = Collections.unmodifiableMap(headers);
+        }
+        else
+        {
+            _headers = null;
+        }
     }
 
 
     @Override
     public long getId()
     {
-        return _instance.getMessage().getMessageNumber();
+        return _id;
     }
 
     @Override
     public long getSize()
     {
-        return _instance.getMessage().getSize();
+        return _size;
     }
 
     @Override
     public int getDeliveryCount()
     {
-        return _instance.getDeliveryCount();
+        return _deliveryCount;
     }
 
     @Override
     public String getState()
     {
-        return _instance.isAvailable()
-                ? "Available"
-                : _instance.isAcquired()
-                        ? "Acquired"
-                        : "";
+        return _state;
     }
 
     @Override
     public String getDeliveredTo()
     {
-        final ConsumerImpl deliveredConsumer = _instance.getDeliveredConsumer();
-        return deliveredConsumer == null ? null : String.valueOf(deliveredConsumer.getConsumerNumber());
+        return _deliveredTo;
     }
 
     @Override
     public long getArrivalTime()
     {
-        return _instance.getMessage().getArrivalTime();
+        return _arrivalTime;
     }
 
     @Override
     public boolean isPersistent()
     {
-        return _instance.getMessage().isPersistent();
+        return _persistent;
     }
 
     @Override
     public String getMessageId()
     {
-        return _instance.getMessage().getMessageHeader().getMessageId();
+        return _messageId;
     }
 
     @Override
     public Long getExpirationTime()
     {
-        long expiration = _instance.getMessage().getMessageHeader().getExpiration();
-        return expiration == 0l ? null : expiration;
+        return _expirationTime;
     }
 
     @Override
     public String getApplicationId()
     {
-        return _instance.getMessage().getMessageHeader().getAppId();
+        return _applicationId;
     }
 
     @Override
     public String getCorrelationId()
     {
-        return _instance.getMessage().getMessageHeader().getCorrelationId();
+        return _correlationId;
     }
 
     @Override
     public String getEncoding()
     {
-        return _instance.getMessage().getMessageHeader().getEncoding();
+        return _encoding;
     }
 
     @Override
     public String getMimeType()
     {
-        return _instance.getMessage().getMessageHeader().getMimeType();
+        return _mimeType;
     }
 
     @Override
     public int getPriority()
     {
-        return _instance.getMessage().getMessageHeader().getPriority();
+        return _priority;
     }
 
     @Override
     public String getReplyTo()
     {
-        return _instance.getMessage().getMessageHeader().getReplyTo();
+        return _replyTo;
     }
 
     @Override
     public Long getTimestamp()
     {
-        long timestamp = _instance.getMessage().getMessageHeader().getTimestamp();
-        return timestamp == 0l ? null : timestamp;
+        return _timestamp;
     }
 
     @Override
     public String getType()
     {
-        return _instance.getMessage().getMessageHeader().getType();
+        return _type;
     }
 
     @Override
     public String getUserId()
     {
-        return _instance.getMessage().getMessageHeader().getUserId();
+        return _userId;
     }
 
     @Override
     public Map<String, Object> getHeaders()
     {
-        if(_includeHeaders)
-        {
-            final Collection<String> headerNames =
-                    _instance.getMessage().getMessageHeader().getHeaderNames();
-            return new AbstractMap<String, Object>()
-            {
-                @Override
-                public Set<Entry<String, Object>> entrySet()
-                {
-                    return new AbstractSet<Entry<String, Object>>()
-                    {
-                        @Override
-                        public Iterator<Entry<String, Object>> iterator()
-                        {
-                            final Iterator<String> nameIterator = headerNames.iterator();
-
-                            return new Iterator<Entry<String, Object>>()
-                            {
-                                @Override
-                                public boolean hasNext()
-                                {
-                                    return nameIterator.hasNext();
-                                }
-
-                                @Override
-                                public Entry<String, Object> next()
-                                {
-                                    final String headerName = nameIterator.next();
-                                    final Object value =
-                                            _instance.getMessage().getMessageHeader().getHeader(headerName);
-                                    return new Entry<String, Object>()
-                                    {
-                                        @Override
-                                        public String getKey()
-                                        {
-                                            return headerName;
-                                        }
-
-                                        @Override
-                                        public Object getValue()
-                                        {
-                                            return value;
-                                        }
-
-                                        @Override
-                                        public Object setValue(final Object value)
-                                        {
-                                            throw new UnsupportedOperationException();
-                                        }
-                                    };
-                                }
-
-                                @Override
-                                public void remove()
-                                {
-                                    nameIterator.remove();
-                                }
-                            };
-                        }
-
-                        @Override
-                        public int size()
-                        {
-                            return headerNames.size();
-                        }
-                    };
-                }
-            };
-        }
-        else
-        {
-            return null;
-        }
+        return _headers;
+    }
+
+    public String getInitialRoutingAddress()
+    {
+        return _initialRoutingAddress;
     }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug 28 11:37:38 2015
@@ -50,7 +50,7 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.message.MessageInfo;
-import org.apache.qpid.server.message.MessageInfoFacade;
+import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.model.CustomRestHeaders;
 import org.apache.qpid.server.model.RestContentHeader;
 
@@ -3419,7 +3419,7 @@ public abstract class AbstractQueue<X ex
             {
                 if (_messageNumber == message.getMessageNumber())
                 {
-                    _messageInfo = new MessageInfoFacade(entry, true);
+                    _messageInfo = new MessageInfoImpl(entry, true);
                 }
             }
             return false;
@@ -3536,7 +3536,7 @@ public abstract class AbstractQueue<X ex
             _position++;
             if((_first == -1 || _position >= _first) && (_last == -1 || _position <= _last))
             {
-                _messages.add(new MessageInfoFacade(entry, _includeHeaders));
+                _messages.add(new MessageInfoImpl(entry, _includeHeaders));
             }
             return _last != -1 && _position > _last;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Aug 28 11:37:38 2015
@@ -46,6 +46,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -421,16 +422,32 @@ class QueueConsumerImpl
         }
         else
         {
-            Filterable msg = entry.asFilterable();
-            try
+            MessageReference ref = entry.newMessageReference();
+            if(ref != null)
             {
-                return _filters.allAllow(msg);
+                try
+                {
+
+                    Filterable msg = entry.asFilterable();
+                    try
+                    {
+                        return _filters.allAllow(msg);
+                    }
+                    catch (SelectorParsingException e)
+                    {
+                        LOGGER.info(this + " could not evaluate filter [" + _filters
+                                    + "]  against message " + msg
+                                    + ". Error was : " + e.getMessage());
+                        return false;
+                    }
+                }
+                finally
+                {
+                    ref.release();
+                }
             }
-            catch (SelectorParsingException e)
+            else
             {
-                LOGGER.info(this + " could not evaluate filter [" + _filters
-                             + "]  against message " + msg
-                             + ". Error was : " + e.getMessage());
                 return false;
             }
         }

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Fri Aug 28 11:37:38 2015
@@ -39,7 +39,10 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInfo;
+import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -204,7 +207,7 @@ public class ReportRunner<T>
         @Override
         public boolean visit(final QueueEntry entry)
         {
-            _report.addMessage(convertMessage(entry.getMessage()));
+            _report.addMessage(convertMessage(entry));
             return _report.isComplete();
         }
 
@@ -212,154 +215,160 @@ public class ReportRunner<T>
     }
 
 
-    private static ReportableMessage convertMessage(final ServerMessage<?> message)
+    private static ReportableMessage convertMessage(QueueEntry entry)
     {
+        final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
+        final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent(0, (int) entry.getSize());
+        final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
+        for(QpidByteBuffer buf : contentBuffers)
+        {
+            buf.dispose();
+        }
+
         return new ReportableMessage()
         {
             @Override
             public String getInitialRoutingAddress()
             {
-                return message.getInitialRoutingAddress();
+                return messageInfo.getInitialRoutingAddress();
             }
 
             @Override
             public ReportableMessageHeader getMessageHeader()
             {
-                return convertMessageHeader(message.getMessageHeader());
+                return convertMessageHeader(messageInfo);
             }
 
             @Override
             public ByteBuffer getContent()
             {
-                ByteBuffer content = ByteBufferUtils.combine(message.getContent(0, (int) getSize()));
-
                 return content.asReadOnlyBuffer();
             }
 
             @Override
             public boolean isPersistent()
             {
-                return message.isPersistent();
+                return messageInfo.isPersistent();
             }
 
             @Override
             public long getSize()
             {
-                return message.getSize();
+                return messageInfo.getSize();
             }
 
             @Override
             public long getExpiration()
             {
-                return message.getExpiration();
+                return messageInfo.getExpirationTime() == null ? 0l : messageInfo.getExpirationTime();
             }
 
             @Override
             public long getMessageNumber()
             {
-                return message.getMessageNumber();
+                return messageInfo.getId();
             }
 
             @Override
             public long getArrivalTime()
             {
-                return message.getArrivalTime();
+                return messageInfo.getArrivalTime();
             }
         };
     }
 
-    private static ReportableMessageHeader convertMessageHeader(final AMQMessageHeader messageHeader)
+    private static ReportableMessageHeader convertMessageHeader(final MessageInfoImpl messageInfo)
     {
         return new ReportableMessageHeader()
         {
             @Override
             public String getCorrelationId()
             {
-                return messageHeader.getCorrelationId();
+                return messageInfo.getCorrelationId();
             }
 
             @Override
             public long getExpiration()
             {
-                return messageHeader.getExpiration();
+                return messageInfo.getExpirationTime() == null ? 0l : messageInfo.getExpirationTime();
             }
 
             @Override
             public String getUserId()
             {
-                return messageHeader.getUserId();
+                return messageInfo.getUserId();
             }
 
             @Override
             public String getAppId()
             {
-                return messageHeader.getAppId();
+                return messageInfo.getApplicationId();
             }
 
             @Override
             public String getMessageId()
             {
-                return messageHeader.getMessageId();
+                return messageInfo.getMessageId();
             }
 
             @Override
             public String getMimeType()
             {
-                return messageHeader.getMimeType();
+                return messageInfo.getMimeType();
             }
 
             @Override
             public String getEncoding()
             {
-                return messageHeader.getEncoding();
+                return messageInfo.getEncoding();
             }
 
             @Override
             public byte getPriority()
             {
-                return messageHeader.getPriority();
+                return (byte) messageInfo.getPriority();
             }
 
             @Override
             public long getTimestamp()
             {
-                return messageHeader.getTimestamp();
+                return messageInfo.getTimestamp();
             }
 
             @Override
             public String getType()
             {
-                return messageHeader.getType();
+                return messageInfo.getType();
             }
 
             @Override
             public String getReplyTo()
             {
-                return messageHeader.getReplyTo();
+                return messageInfo.getReplyTo();
             }
 
             @Override
             public Object getHeader(final String name)
             {
-                return makeImmutable(messageHeader.getHeader(name));
+                return makeImmutable(messageInfo.getHeaders().get(name));
             }
 
             @Override
             public boolean containsHeaders(final Set<String> names)
             {
-                return messageHeader.containsHeaders(names);
+                return messageInfo.getHeaders().keySet().contains(names);
             }
 
             @Override
             public boolean containsHeader(final String name)
             {
-                return messageHeader.containsHeader(name);
+                return messageInfo.getHeaders().containsKey(name);
             }
 
             @Override
             public Collection<String> getHeaderNames()
             {
-                return Collections.unmodifiableCollection(messageHeader.getHeaderNames());
+                return Collections.unmodifiableCollection(messageInfo.getHeaders().keySet());
             }
         };
     }

Modified: qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java Fri Aug 28 11:37:38 2015
@@ -54,18 +54,27 @@ public class ReportRunnerTest extends Qp
 
         runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
                                                                   Collections.<String, String[]>emptyMap());
-        Queue queue1 = createMockQueue(mock(ServerMessage.class));
+        Queue queue1 = createMockQueue(createMockMessageForQueue());
         assertEquals("There are 1 messages on the queue.", runner.runReport(queue1));
 
         runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
                                                                   Collections.<String, String[]>emptyMap());
-        Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+        Queue queue2 = createMockQueue(createMockMessageForQueue(), createMockMessageForQueue());
         assertEquals("There are 2 messages on the queue.", runner.runReport(queue2));
     }
 
+    protected ServerMessage createMockMessageForQueue()
+    {
+        final ServerMessage message = mock(ServerMessage.class);
+        final AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(message.getMessageHeader()).thenReturn(header);
+
+        return message;
+    }
+
     public void testTextReportSingleStringParam()
     {
-        Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+        Queue queue2 = createMockQueue(createMockMessageForQueue(), createMockMessageForQueue());
 
         Map<String, String[]> parameterMap = new HashMap<>();
         parameterMap.put("stringParam", new String[]{"hello world"});



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org