You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/03 15:59:09 UTC

[3/3] qpid-broker-j git commit: QPID-7763: [Java Broker] Avoid allocating large non-pooled direct byte buffers in WebSocketProvider, MemoryMessageStore, and message inflation

QPID-7763: [Java Broker] Avoid allocating large non-pooled direct byte buffers in WebSocketProvider, MemoryMessageStore, and message inflation


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ed320c22
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ed320c22
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ed320c22

Branch: refs/heads/6.1.x
Commit: ed320c2242d4584bc6b2eaf253c91856f546e318
Parents: 453e5a7
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Apr 28 14:33:42 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed May 3 16:34:11 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/store/StoredMemoryMessage.java  | 103 +++++++++++--------
 .../transport/websocket/WebSocketProvider.java  |  10 +-
 .../apache/qpid/bytebuffer/QpidByteBuffer.java  |  23 ++++-
 3 files changed, 85 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 18e74e2..7b8eeff 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -21,21 +21,28 @@
 
 package org.apache.qpid.server.store;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
 {
     private final long _messageNumber;
-    private QpidByteBuffer _content;
-    private T _metaData;
+
+    private final int _contentSize;
+    private final Queue<QpidByteBuffer> _content = new LinkedList<>();
+    private volatile T _metaData;
 
     public StoredMemoryMessage(long messageNumber, T metaData)
     {
         _messageNumber = messageNumber;
         _metaData = metaData;
+        _contentSize = _metaData.getContentSize();
     }
 
     public long getMessageNumber()
@@ -43,55 +50,64 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return _messageNumber;
     }
 
-    public void addContent(QpidByteBuffer src)
+    public synchronized void addContent(QpidByteBuffer src)
     {
-        if(_content == null)
-        {
-            _content = src.slice();
-            _content.position(_content.limit());
-        }
-        else
-        {
-            if(_content.remaining() >= src.remaining())
-            {
-                _content.putCopyOf(src);
-            }
-            else
-            {
-                final int contentSize = _metaData.getContentSize();
-                int size = (contentSize < _content.position() + src.remaining())
-                        ? _content.position() + src.remaining()
-                        : contentSize;
-                QpidByteBuffer oldContent = _content;
-                oldContent.flip();
-                _content = QpidByteBuffer.allocateDirect(size);
-                _content.put(oldContent);
-                _content.putCopyOf(src);
-                oldContent.dispose();
-            }
-
-        }
+        _content.add(src.slice());
     }
 
     @Override
-    public StoredMessage<T> allContentAdded()
+    public synchronized StoredMessage<T> allContentAdded()
     {
-        if(_content != null)
-        {
-            _content.flip();
-        }
         return this;
     }
 
 
     @Override
-    public Collection<QpidByteBuffer> getContent(int offset, int length)
+    public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
     {
-        if(_content == null)
+        Collection<QpidByteBuffer> content = new ArrayList<>(_content.size());
+        int pos = 0;
+        for (QpidByteBuffer buf : _content)
         {
-            return null;
+            if (length > 0)
+            {
+                int bufRemaining = buf.remaining();
+                if (pos + bufRemaining <= offset)
+                {
+                    pos += bufRemaining;
+                }
+                else if (pos >= offset)
+                {
+                    buf = buf.duplicate();
+                    if (bufRemaining <= length)
+                    {
+                        length -= bufRemaining;
+                    }
+                    else
+                    {
+                        buf.limit(length);
+                        length = 0;
+                    }
+                    content.add(buf);
+                    pos += buf.remaining();
+                }
+                else
+                {
+                    int offsetInBuf = offset - pos;
+                    int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                    final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                    content.add(bufView);
+                    length -= limit;
+                    pos += limit + offsetInBuf;
+                }
+            }
         }
-        return Collections.singleton(_content.view(offset, length));
+        return content;
+    }
+
+    public int getContentSize()
+    {
+        return _contentSize;
     }
 
     public T getMetaData()
@@ -99,14 +115,17 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         return _metaData;
     }
 
-    public void remove()
+    public synchronized void remove()
     {
         _metaData.dispose();
         _metaData = null;
         if (_content != null)
         {
-            _content.dispose();
-            _content = null;
+            for (QpidByteBuffer content : _content)
+            {
+                content.dispose();
+            }
+            _content.clear();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index e764805..5c18719 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -300,11 +300,11 @@ class WebSocketProvider implements AcceptingTransport
                         iter.next().run();
                     }
 
-                    QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
-                    buffer.put(data, offset, length);
-                    buffer.flip();
-                    _protocolEngine.received(buffer);
-                    buffer.dispose();
+                    for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length))
+                    {
+                        _protocolEngine.received(qpidByteBuffer);
+                        qpidByteBuffer.dispose();
+                    }
 
                     _connectionWrapper.doWrite();
                     _protocolEngine.setMessageAssignmentSuspended(false, true);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 87d865b..842dc48 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -649,6 +649,24 @@ public class QpidByteBuffer
         }
     }
 
+    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
+    {
+        return asQpidByteBuffers(data, 0, data.length);
+    }
+
+    public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+    {
+        try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
+        {
+            outputStream.write(data, offset, length);
+            return outputStream.fetchAccumulatedBuffers();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
+        }
+    }
+
     public static SSLEngineResult encryptSSL(SSLEngine engine,
                                              final Collection<QpidByteBuffer> buffers,
                                              QpidByteBuffer dest) throws SSLException
@@ -694,10 +712,7 @@ public class QpidByteBuffer
             int read;
             while ((read = gzipInputStream.read(buf)) != -1)
             {
-                QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
-                output.put(buf, 0, read);
-                output.flip();
-                uncompressedBuffers.add(output);
+                uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
             }
             return uncompressedBuffers;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org