You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/27 21:39:01 UTC
svn commit: r1655154 -
/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Author: rgodfrey
Date: Tue Jan 27 20:39:01 2015
New Revision: 1655154
URL: http://svn.apache.org/r1655154
Log:
QPID-6331 : Allow AMQP 1.0 message content to be evicted to disk
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1655154&r1=1655153&r2=1655154&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Jan 27 20:39:01 2015
@@ -21,23 +21,42 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
- private List<ByteBuffer> _fragments;
+ private volatile SoftReference<List<ByteBuffer>> _fragmentsRef;
private long _arrivalTime;
+ private final long _size;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
super(storedMessage, null);
- _fragments = restoreFragments(storedMessage);
+ final List<ByteBuffer> fragments = restoreFragments(getStoredMessage());
+ _fragmentsRef = new SoftReference<>(fragments);
+ _size = calculateSize(fragments);
+ }
+
+ private long calculateSize(final List<ByteBuffer> fragments)
+ {
+
+ long size = 0l;
+ if(fragments != null)
+ {
+ for(ByteBuffer buf : fragments)
+ {
+ size += buf.remaining();
+ }
+ }
+ return size;
}
private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
@@ -65,7 +84,8 @@ public class Message_1_0 extends Abstrac
final Object connectionReference)
{
super(storedMessage, connectionReference);
- _fragments = fragments;
+ _fragmentsRef = new SoftReference<>(fragments);
+ _size = calculateSize(fragments);
_arrivalTime = System.currentTimeMillis();
}
@@ -94,16 +114,7 @@ public class Message_1_0 extends Abstrac
public long getSize()
{
- long size = 0l;
- if(_fragments != null)
- {
- for(ByteBuffer buf : _fragments)
- {
- size += buf.remaining();
- }
- }
-
- return size;
+ return _size;
}
public long getExpiration()
@@ -118,7 +129,14 @@ public class Message_1_0 extends Abstrac
public List<ByteBuffer> getFragments()
{
- return _fragments;
+
+ List<ByteBuffer> fragments = _fragmentsRef.get();
+ if(fragments == null)
+ {
+ fragments = restoreFragments(getStoredMessage());
+ _fragmentsRef = new SoftReference<>(fragments);
+ }
+ return fragments;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org