You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/27 21:39:01 UTC

svn commit: r1655154 - /qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java

Author: rgodfrey
Date: Tue Jan 27 20:39:01 2015
New Revision: 1655154

URL: http://svn.apache.org/r1655154
Log:
QPID-6331 : Allow AMQP 1.0 message content to be evicted to disk

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1655154&r1=1655153&r2=1655154&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Jan 27 20:39:01 2015
@@ -21,23 +21,42 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
+import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
 
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
 
-    private List<ByteBuffer> _fragments;
+    private volatile SoftReference<List<ByteBuffer>> _fragmentsRef;
     private long _arrivalTime;
+    private final long _size;
 
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
         super(storedMessage, null);
-        _fragments = restoreFragments(storedMessage);
+        final List<ByteBuffer> fragments = restoreFragments(getStoredMessage());
+        _fragmentsRef = new SoftReference<>(fragments);
+        _size = calculateSize(fragments);
+    }
+
+    private long calculateSize(final List<ByteBuffer> fragments)
+    {
+
+        long size = 0l;
+        if(fragments != null)
+        {
+            for(ByteBuffer buf : fragments)
+            {
+                size += buf.remaining();
+            }
+        }
+        return size;
     }
 
     private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
@@ -65,7 +84,8 @@ public class Message_1_0 extends Abstrac
                        final Object connectionReference)
     {
         super(storedMessage, connectionReference);
-        _fragments = fragments;
+        _fragmentsRef = new SoftReference<>(fragments);
+        _size = calculateSize(fragments);
         _arrivalTime = System.currentTimeMillis();
     }
 
@@ -94,16 +114,7 @@ public class Message_1_0 extends Abstrac
 
     public long getSize()
     {
-        long size = 0l;
-        if(_fragments != null)
-        {
-            for(ByteBuffer buf : _fragments)
-            {
-                size += buf.remaining();
-            }
-        }
-
-        return size;
+        return _size;
     }
 
     public long getExpiration()
@@ -118,7 +129,14 @@ public class Message_1_0 extends Abstrac
 
     public List<ByteBuffer> getFragments()
     {
-        return _fragments;
+
+        List<ByteBuffer> fragments = _fragmentsRef.get();
+        if(fragments == null)
+        {
+            fragments = restoreFragments(getStoredMessage());
+            _fragmentsRef = new SoftReference<>(fragments);
+        }
+        return fragments;
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org