You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/03 15:59:09 UTC
[3/3] qpid-broker-j git commit: QPID-7763: [Java Broker] Avoid
allocating large non-pooled direct byte buffers in WebSocketProvider,
MemoryMessageStore, and message inflation
QPID-7763: [Java Broker] Avoid allocating large non-pooled direct byte buffers in WebSocketProvider, MemoryMessageStore, and message inflation
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ed320c22
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ed320c22
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ed320c22
Branch: refs/heads/6.1.x
Commit: ed320c2242d4584bc6b2eaf253c91856f546e318
Parents: 453e5a7
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Apr 28 14:33:42 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed May 3 16:34:11 2017 +0100
----------------------------------------------------------------------
.../qpid/server/store/StoredMemoryMessage.java | 103 +++++++++++--------
.../transport/websocket/WebSocketProvider.java | 10 +-
.../apache/qpid/bytebuffer/QpidByteBuffer.java | 23 ++++-
3 files changed, 85 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 18e74e2..7b8eeff 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -21,21 +21,28 @@
package org.apache.qpid.server.store;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
{
private final long _messageNumber;
- private QpidByteBuffer _content;
- private T _metaData;
+
+ private final int _contentSize;
+ private final Queue<QpidByteBuffer> _content = new LinkedList<>();
+ private volatile T _metaData;
public StoredMemoryMessage(long messageNumber, T metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
+ _contentSize = _metaData.getContentSize();
}
public long getMessageNumber()
@@ -43,55 +50,64 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return _messageNumber;
}
- public void addContent(QpidByteBuffer src)
+ public synchronized void addContent(QpidByteBuffer src)
{
- if(_content == null)
- {
- _content = src.slice();
- _content.position(_content.limit());
- }
- else
- {
- if(_content.remaining() >= src.remaining())
- {
- _content.putCopyOf(src);
- }
- else
- {
- final int contentSize = _metaData.getContentSize();
- int size = (contentSize < _content.position() + src.remaining())
- ? _content.position() + src.remaining()
- : contentSize;
- QpidByteBuffer oldContent = _content;
- oldContent.flip();
- _content = QpidByteBuffer.allocateDirect(size);
- _content.put(oldContent);
- _content.putCopyOf(src);
- oldContent.dispose();
- }
-
- }
+ _content.add(src.slice());
}
@Override
- public StoredMessage<T> allContentAdded()
+ public synchronized StoredMessage<T> allContentAdded()
{
- if(_content != null)
- {
- _content.flip();
- }
return this;
}
@Override
- public Collection<QpidByteBuffer> getContent(int offset, int length)
+ public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
{
- if(_content == null)
+ Collection<QpidByteBuffer> content = new ArrayList<>(_content.size());
+ int pos = 0;
+ for (QpidByteBuffer buf : _content)
{
- return null;
+ if (length > 0)
+ {
+ int bufRemaining = buf.remaining();
+ if (pos + bufRemaining <= offset)
+ {
+ pos += bufRemaining;
+ }
+ else if (pos >= offset)
+ {
+ buf = buf.duplicate();
+ if (bufRemaining <= length)
+ {
+ length -= bufRemaining;
+ }
+ else
+ {
+ buf.limit(length);
+ length = 0;
+ }
+ content.add(buf);
+ pos += buf.remaining();
+ }
+ else
+ {
+ int offsetInBuf = offset - pos;
+ int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+ final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+ content.add(bufView);
+ length -= limit;
+ pos += limit + offsetInBuf;
+ }
+ }
}
- return Collections.singleton(_content.view(offset, length));
+ return content;
+ }
+
+ public int getContentSize()
+ {
+ return _contentSize;
}
public T getMetaData()
@@ -99,14 +115,17 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return _metaData;
}
- public void remove()
+ public synchronized void remove()
{
_metaData.dispose();
_metaData = null;
if (_content != null)
{
- _content.dispose();
- _content = null;
+ for (QpidByteBuffer content : _content)
+ {
+ content.dispose();
+ }
+ _content.clear();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index e764805..5c18719 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -300,11 +300,11 @@ class WebSocketProvider implements AcceptingTransport
iter.next().run();
}
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(length);
- buffer.put(data, offset, length);
- buffer.flip();
- _protocolEngine.received(buffer);
- buffer.dispose();
+ for (QpidByteBuffer qpidByteBuffer : QpidByteBuffer.asQpidByteBuffers(data, offset, length))
+ {
+ _protocolEngine.received(qpidByteBuffer);
+ qpidByteBuffer.dispose();
+ }
_connectionWrapper.doWrite();
_protocolEngine.setMessageAssignmentSuspended(false, true);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ed320c22/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 87d865b..842dc48 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -649,6 +649,24 @@ public class QpidByteBuffer
}
}
+ public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data)
+ {
+ return asQpidByteBuffers(data, 0, data.length);
+ }
+
+ public static Collection<QpidByteBuffer> asQpidByteBuffers(final byte[] data, final int offset, final int length)
+ {
+ try (QpidByteBufferOutputStream outputStream = new QpidByteBufferOutputStream(true, getPooledBufferSize()))
+ {
+ outputStream.write(data, offset, length);
+ return outputStream.fetchAccumulatedBuffers();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("unexpected Error converting array to QpidByteBuffers", e);
+ }
+ }
+
public static SSLEngineResult encryptSSL(SSLEngine engine,
final Collection<QpidByteBuffer> buffers,
QpidByteBuffer dest) throws SSLException
@@ -694,10 +712,7 @@ public class QpidByteBuffer
int read;
while ((read = gzipInputStream.read(buf)) != -1)
{
- QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
- output.put(buf, 0, read);
- output.flip();
- uncompressedBuffers.add(output);
+ uncompressedBuffers.addAll(asQpidByteBuffers(buf, 0, read));
}
return uncompressedBuffers;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org