You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/28 13:37:38 UTC
svn commit: r1698311 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/
...
Author: rgodfrey
Date: Fri Aug 28 11:37:38 2015
New Revision: 1698311
URL: http://svn.apache.org/r1698311
Log:
QPID-6713 : Materialize message content / properties when visiting messages on the queue, rather than retaining references to the entry / object
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
- copied, changed from r1698286, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java (from r1698286, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java&r1=1698286&r2=1698311&rev=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoFacade.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java Fri Aug 28 11:37:38 2015
@@ -20,217 +20,199 @@
*/
package org.apache.qpid.server.message;
-import java.util.AbstractMap;
-import java.util.AbstractSet;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-
-public class MessageInfoFacade implements MessageInfo
+public class MessageInfoImpl implements MessageInfo
{
- private final MessageInstance _instance;
- private final boolean _includeHeaders;
+ private final String _deliveredTo;
+ private final long _arrivalTime;
+ private final boolean _persistent;
+ private final String _messageId;
+ private final Long _expirationTime;
+ private final String _applicationId;
+ private final String _correlationId;
+ private final String _encoding;
+ private final String _mimeType;
+ private final byte _priority;
+ private final String _replyTo;
+ private final Long _timestamp;
+ private final String _type;
+ private final String _userId;
+ private final String _state;
+ private final int _deliveryCount;
+ private final long _size;
+ private final long _id;
+ private final Map<String, Object> _headers;
+ private final String _initialRoutingAddress;
+
+ public MessageInfoImpl(final MessageInstance instance, final boolean includeHeaders)
+ {
+ final ServerMessage message = instance.getMessage();
+ final AMQMessageHeader messageHeader = message.getMessageHeader();
+
+ _deliveredTo = instance.getDeliveredConsumer() == null ? null : String.valueOf(instance.getDeliveredConsumer()
+ .getConsumerNumber());
+ _arrivalTime = message.getArrivalTime();
+ _persistent = message.isPersistent();
+ _messageId = messageHeader.getMessageId();
+ _expirationTime = messageHeader.getExpiration() == 0l
+ ? null
+ : messageHeader.getExpiration();
+ _applicationId = messageHeader.getAppId();
+ _correlationId = messageHeader.getCorrelationId();
+ _encoding = messageHeader.getEncoding();
+ _mimeType = messageHeader.getMimeType();
+ _priority = messageHeader.getPriority();
+ _replyTo = messageHeader.getReplyTo();
+ _timestamp = messageHeader.getTimestamp() == 0l
+ ? null
+ : messageHeader.getTimestamp();
+ _type = messageHeader.getType();
+ _userId = messageHeader.getUserId();
+ _state = instance.isAvailable()
+ ? "Available"
+ : instance.isAcquired()
+ ? "Acquired"
+ : "";
+ _deliveryCount = instance.getDeliveryCount();
+ _size = message.getSize();
+ _id = message.getMessageNumber();
+ _initialRoutingAddress = message.getInitialRoutingAddress();
- public MessageInfoFacade(final MessageInstance instance, final boolean includeHeaders)
- {
- _instance = instance;
- _includeHeaders = includeHeaders;
+ if(includeHeaders)
+ {
+ Map<String,Object> headers = new LinkedHashMap<>();
+ for(String headerName : messageHeader.getHeaderNames())
+ {
+ headers.put(headerName, messageHeader.getHeader(headerName));
+ }
+ _headers = Collections.unmodifiableMap(headers);
+ }
+ else
+ {
+ _headers = null;
+ }
}
@Override
public long getId()
{
- return _instance.getMessage().getMessageNumber();
+ return _id;
}
@Override
public long getSize()
{
- return _instance.getMessage().getSize();
+ return _size;
}
@Override
public int getDeliveryCount()
{
- return _instance.getDeliveryCount();
+ return _deliveryCount;
}
@Override
public String getState()
{
- return _instance.isAvailable()
- ? "Available"
- : _instance.isAcquired()
- ? "Acquired"
- : "";
+ return _state;
}
@Override
public String getDeliveredTo()
{
- final ConsumerImpl deliveredConsumer = _instance.getDeliveredConsumer();
- return deliveredConsumer == null ? null : String.valueOf(deliveredConsumer.getConsumerNumber());
+ return _deliveredTo;
}
@Override
public long getArrivalTime()
{
- return _instance.getMessage().getArrivalTime();
+ return _arrivalTime;
}
@Override
public boolean isPersistent()
{
- return _instance.getMessage().isPersistent();
+ return _persistent;
}
@Override
public String getMessageId()
{
- return _instance.getMessage().getMessageHeader().getMessageId();
+ return _messageId;
}
@Override
public Long getExpirationTime()
{
- long expiration = _instance.getMessage().getMessageHeader().getExpiration();
- return expiration == 0l ? null : expiration;
+ return _expirationTime;
}
@Override
public String getApplicationId()
{
- return _instance.getMessage().getMessageHeader().getAppId();
+ return _applicationId;
}
@Override
public String getCorrelationId()
{
- return _instance.getMessage().getMessageHeader().getCorrelationId();
+ return _correlationId;
}
@Override
public String getEncoding()
{
- return _instance.getMessage().getMessageHeader().getEncoding();
+ return _encoding;
}
@Override
public String getMimeType()
{
- return _instance.getMessage().getMessageHeader().getMimeType();
+ return _mimeType;
}
@Override
public int getPriority()
{
- return _instance.getMessage().getMessageHeader().getPriority();
+ return _priority;
}
@Override
public String getReplyTo()
{
- return _instance.getMessage().getMessageHeader().getReplyTo();
+ return _replyTo;
}
@Override
public Long getTimestamp()
{
- long timestamp = _instance.getMessage().getMessageHeader().getTimestamp();
- return timestamp == 0l ? null : timestamp;
+ return _timestamp;
}
@Override
public String getType()
{
- return _instance.getMessage().getMessageHeader().getType();
+ return _type;
}
@Override
public String getUserId()
{
- return _instance.getMessage().getMessageHeader().getUserId();
+ return _userId;
}
@Override
public Map<String, Object> getHeaders()
{
- if(_includeHeaders)
- {
- final Collection<String> headerNames =
- _instance.getMessage().getMessageHeader().getHeaderNames();
- return new AbstractMap<String, Object>()
- {
- @Override
- public Set<Entry<String, Object>> entrySet()
- {
- return new AbstractSet<Entry<String, Object>>()
- {
- @Override
- public Iterator<Entry<String, Object>> iterator()
- {
- final Iterator<String> nameIterator = headerNames.iterator();
-
- return new Iterator<Entry<String, Object>>()
- {
- @Override
- public boolean hasNext()
- {
- return nameIterator.hasNext();
- }
-
- @Override
- public Entry<String, Object> next()
- {
- final String headerName = nameIterator.next();
- final Object value =
- _instance.getMessage().getMessageHeader().getHeader(headerName);
- return new Entry<String, Object>()
- {
- @Override
- public String getKey()
- {
- return headerName;
- }
-
- @Override
- public Object getValue()
- {
- return value;
- }
-
- @Override
- public Object setValue(final Object value)
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void remove()
- {
- nameIterator.remove();
- }
- };
- }
-
- @Override
- public int size()
- {
- return headerNames.size();
- }
- };
- }
- };
- }
- else
- {
- return null;
- }
+ return _headers;
+ }
+
+ public String getInitialRoutingAddress()
+ {
+ return _initialRoutingAddress;
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug 28 11:37:38 2015
@@ -50,7 +50,7 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.message.MessageInfo;
-import org.apache.qpid.server.message.MessageInfoFacade;
+import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.model.CustomRestHeaders;
import org.apache.qpid.server.model.RestContentHeader;
@@ -3419,7 +3419,7 @@ public abstract class AbstractQueue<X ex
{
if (_messageNumber == message.getMessageNumber())
{
- _messageInfo = new MessageInfoFacade(entry, true);
+ _messageInfo = new MessageInfoImpl(entry, true);
}
}
return false;
@@ -3536,7 +3536,7 @@ public abstract class AbstractQueue<X ex
_position++;
if((_first == -1 || _position >= _first) && (_last == -1 || _position <= _last))
{
- _messages.add(new MessageInfoFacade(entry, _includeHeaders));
+ _messages.add(new MessageInfoImpl(entry, _includeHeaders));
}
return _last != -1 && _position > _last;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Aug 28 11:37:38 2015
@@ -46,6 +46,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -421,16 +422,32 @@ class QueueConsumerImpl
}
else
{
- Filterable msg = entry.asFilterable();
- try
+ MessageReference ref = entry.newMessageReference();
+ if(ref != null)
{
- return _filters.allAllow(msg);
+ try
+ {
+
+ Filterable msg = entry.asFilterable();
+ try
+ {
+ return _filters.allAllow(msg);
+ }
+ catch (SelectorParsingException e)
+ {
+ LOGGER.info(this + " could not evaluate filter [" + _filters
+ + "] against message " + msg
+ + ". Error was : " + e.getMessage());
+ return false;
+ }
+ }
+ finally
+ {
+ ref.release();
+ }
}
- catch (SelectorParsingException e)
+ else
{
- LOGGER.info(this + " could not evaluate filter [" + _filters
- + "] against message " + msg
- + ". Error was : " + e.getMessage());
return false;
}
}
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Fri Aug 28 11:37:38 2015
@@ -39,7 +39,10 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInfo;
+import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -204,7 +207,7 @@ public class ReportRunner<T>
@Override
public boolean visit(final QueueEntry entry)
{
- _report.addMessage(convertMessage(entry.getMessage()));
+ _report.addMessage(convertMessage(entry));
return _report.isComplete();
}
@@ -212,154 +215,160 @@ public class ReportRunner<T>
}
- private static ReportableMessage convertMessage(final ServerMessage<?> message)
+ private static ReportableMessage convertMessage(QueueEntry entry)
{
+ final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
+ final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent(0, (int) entry.getSize());
+ final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
+ for(QpidByteBuffer buf : contentBuffers)
+ {
+ buf.dispose();
+ }
+
return new ReportableMessage()
{
@Override
public String getInitialRoutingAddress()
{
- return message.getInitialRoutingAddress();
+ return messageInfo.getInitialRoutingAddress();
}
@Override
public ReportableMessageHeader getMessageHeader()
{
- return convertMessageHeader(message.getMessageHeader());
+ return convertMessageHeader(messageInfo);
}
@Override
public ByteBuffer getContent()
{
- ByteBuffer content = ByteBufferUtils.combine(message.getContent(0, (int) getSize()));
-
return content.asReadOnlyBuffer();
}
@Override
public boolean isPersistent()
{
- return message.isPersistent();
+ return messageInfo.isPersistent();
}
@Override
public long getSize()
{
- return message.getSize();
+ return messageInfo.getSize();
}
@Override
public long getExpiration()
{
- return message.getExpiration();
+ return messageInfo.getExpirationTime() == null ? 0l : messageInfo.getExpirationTime();
}
@Override
public long getMessageNumber()
{
- return message.getMessageNumber();
+ return messageInfo.getId();
}
@Override
public long getArrivalTime()
{
- return message.getArrivalTime();
+ return messageInfo.getArrivalTime();
}
};
}
- private static ReportableMessageHeader convertMessageHeader(final AMQMessageHeader messageHeader)
+ private static ReportableMessageHeader convertMessageHeader(final MessageInfoImpl messageInfo)
{
return new ReportableMessageHeader()
{
@Override
public String getCorrelationId()
{
- return messageHeader.getCorrelationId();
+ return messageInfo.getCorrelationId();
}
@Override
public long getExpiration()
{
- return messageHeader.getExpiration();
+ return messageInfo.getExpirationTime() == null ? 0l : messageInfo.getExpirationTime();
}
@Override
public String getUserId()
{
- return messageHeader.getUserId();
+ return messageInfo.getUserId();
}
@Override
public String getAppId()
{
- return messageHeader.getAppId();
+ return messageInfo.getApplicationId();
}
@Override
public String getMessageId()
{
- return messageHeader.getMessageId();
+ return messageInfo.getMessageId();
}
@Override
public String getMimeType()
{
- return messageHeader.getMimeType();
+ return messageInfo.getMimeType();
}
@Override
public String getEncoding()
{
- return messageHeader.getEncoding();
+ return messageInfo.getEncoding();
}
@Override
public byte getPriority()
{
- return messageHeader.getPriority();
+ return (byte) messageInfo.getPriority();
}
@Override
public long getTimestamp()
{
- return messageHeader.getTimestamp();
+ return messageInfo.getTimestamp();
}
@Override
public String getType()
{
- return messageHeader.getType();
+ return messageInfo.getType();
}
@Override
public String getReplyTo()
{
- return messageHeader.getReplyTo();
+ return messageInfo.getReplyTo();
}
@Override
public Object getHeader(final String name)
{
- return makeImmutable(messageHeader.getHeader(name));
+ return makeImmutable(messageInfo.getHeaders().get(name));
}
@Override
public boolean containsHeaders(final Set<String> names)
{
- return messageHeader.containsHeaders(names);
+ return messageInfo.getHeaders().keySet().contains(names);
}
@Override
public boolean containsHeader(final String name)
{
- return messageHeader.containsHeader(name);
+ return messageInfo.getHeaders().containsKey(name);
}
@Override
public Collection<String> getHeaderNames()
{
- return Collections.unmodifiableCollection(messageHeader.getHeaderNames());
+ return Collections.unmodifiableCollection(messageInfo.getHeaders().keySet());
}
};
}
Modified: qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java?rev=1698311&r1=1698310&r2=1698311&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java Fri Aug 28 11:37:38 2015
@@ -54,18 +54,27 @@ public class ReportRunnerTest extends Qp
runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
Collections.<String, String[]>emptyMap());
- Queue queue1 = createMockQueue(mock(ServerMessage.class));
+ Queue queue1 = createMockQueue(createMockMessageForQueue());
assertEquals("There are 1 messages on the queue.", runner.runReport(queue1));
runner = (ReportRunner<String>) ReportRunner.createRunner(TestTextReport.NAME,
Collections.<String, String[]>emptyMap());
- Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+ Queue queue2 = createMockQueue(createMockMessageForQueue(), createMockMessageForQueue());
assertEquals("There are 2 messages on the queue.", runner.runReport(queue2));
}
+ protected ServerMessage createMockMessageForQueue()
+ {
+ final ServerMessage message = mock(ServerMessage.class);
+ final AMQMessageHeader header = mock(AMQMessageHeader.class);
+ when(message.getMessageHeader()).thenReturn(header);
+
+ return message;
+ }
+
public void testTextReportSingleStringParam()
{
- Queue queue2 = createMockQueue(mock(ServerMessage.class), mock(ServerMessage.class));
+ Queue queue2 = createMockQueue(createMockMessageForQueue(), createMockMessageForQueue());
Map<String, String[]> parameterMap = new HashMap<>();
parameterMap.put("stringParam", new String[]{"hello world"});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org