You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/05 15:02:46 UTC
qpid-broker-j git commit: QPID-7753: Reallocate direct memory buffers
based on sparsity
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 75be65b7b -> 1424c408f
QPID-7753: Reallocate direct memory buffers based on sparsity
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/1424c408
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/1424c408
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/1424c408
Branch: refs/heads/master
Commit: 1424c408ff97b600d695d3dfd36c895629c57419
Parents: 75be65b
Author: Alex Rudyy <or...@apache.org>
Authored: Fri May 5 15:58:23 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri May 5 15:58:23 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 18 ++--
.../qpid/server/bytebuffer/ByteBufferRef.java | 8 +-
.../bytebuffer/NonPooledByteBufferRef.java | 14 +--
.../server/bytebuffer/PooledByteBufferRef.java | 41 ++++----
.../qpid/server/bytebuffer/QpidByteBuffer.java | 43 ++++----
.../messages/Queue_logmessages.properties | 4 +-
.../message/internal/InternalMessage.java | 2 +-
.../internal/InternalMessageMetaData.java | 2 +-
.../org/apache/qpid/server/model/Broker.java | 23 ++---
.../apache/qpid/server/model/BrokerImpl.java | 100 ++++++-------------
.../org/apache/qpid/server/model/Queue.java | 2 +-
.../qpid/server/protocol/v0_8/FieldTable.java | 4 +-
.../apache/qpid/server/queue/AbstractQueue.java | 12 +--
.../server/store/StorableMessageMetaData.java | 2 +-
.../qpid/server/store/StoredMemoryMessage.java | 6 +-
.../apache/qpid/server/store/StoredMessage.java | 2 +-
.../server/virtualhost/AbstractVirtualHost.java | 5 +-
.../virtualhost/QueueManagingVirtualHost.java | 2 +-
.../server/bytebuffer/QpidByteBufferTest.java | 20 +++-
.../server/protocol/v0_8/EncodingUtilsTest.java | 3 +-
.../qpid/server/store/TestMessageMetaData.java | 2 +-
.../MessageConverter_Internal_to_v0_10.java | 2 +-
.../protocol/v0_10/MessageConverter_v0_10.java | 2 +-
.../protocol/v0_10/MessageMetaData_0_10.java | 4 +-
.../v0_8/MessageConverter_Internal_to_v0_8.java | 2 +-
.../server/protocol/v0_8/MessageMetaData.java | 4 +-
.../transport/BasicContentHeaderProperties.java | 6 +-
.../v0_8/transport/ContentHeaderBody.java | 4 +-
.../protocol/v1_0/MessageConverter_to_1_0.java | 2 +-
.../protocol/v1_0/MessageMetaData_1_0.java | 14 +--
.../v1_0/type/messaging/AbstractSection.java | 4 +-
.../type/messaging/codec/EncodingRetaining.java | 2 +-
.../MessageConverter_1_0_to_v0_10.java | 2 +-
.../MessageConverter_0_10_to_0_8.java | 2 +-
.../MessageConverter_0_8_to_0_10.java | 2 +-
.../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 2 +-
.../store/jdbc/AbstractJDBCMessageStore.java | 18 ++--
37 files changed, 167 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 732d291..ce25a8f 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -901,7 +901,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
Collection<QpidByteBuffer> getData();
void setData(Collection<QpidByteBuffer> data);
boolean isHardRef();
- void reallocate(final long smallestAllowedBufferId);
+ void reallocate();
}
private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -939,13 +939,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
if(_metaData != null)
{
- _metaData.reallocate(smallestAllowedBufferId);
+ _metaData.reallocate();
}
- _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+ _data = QpidByteBuffer.reallocateIfNecessary(_data);
}
}
@@ -1003,13 +1003,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
if(_metaData != null)
{
- _metaData.reallocate(smallestAllowedBufferId);
+ _metaData.reallocate();
}
- _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+ _data = QpidByteBuffer.reallocateIfNecessary(_data);
}
}
@@ -1272,11 +1272,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
if(_messageDataRef != null)
{
- _messageDataRef.reallocate(smallestAllowedBufferId);
+ _messageDataRef.reallocate();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
index d5fadc7..b950f47 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/ByteBufferRef.java
@@ -24,13 +24,11 @@ import java.nio.ByteBuffer;
public interface ByteBufferRef
{
- void incrementRef();
+ void incrementRef(final int capacity);
- void decrementRef();
+ void decrementRef(final int capacity);
ByteBuffer getBuffer();
- void removeFromPool();
-
- long getPooledBufferId();
+ boolean isSparse(double minimumSparsityFraction);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
index 03a6284..2d0d668 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java
@@ -32,13 +32,13 @@ class NonPooledByteBufferRef implements ByteBufferRef
}
@Override
- public void incrementRef()
+ public void incrementRef(final int capacity)
{
}
@Override
- public void decrementRef()
+ public void decrementRef(final int capacity)
{
}
@@ -50,14 +50,8 @@ class NonPooledByteBufferRef implements ByteBufferRef
}
@Override
- public void removeFromPool()
+ public boolean isSparse(final double minimumSparsityFraction)
{
-
- }
-
- @Override
- public long getPooledBufferId()
- {
- return Long.MAX_VALUE;
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
index cb7a4f8..1e0af48 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java
@@ -23,38 +23,43 @@ package org.apache.qpid.server.bytebuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLong;
class PooledByteBufferRef implements ByteBufferRef
{
- private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
- private static final AtomicLong BUFFER_ID = new AtomicLong(Long.MIN_VALUE);
+ private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
+ private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> CLAIMED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_claimed");
private static final AtomicInteger ACTIVE_BUFFERS = new AtomicInteger();
private final ByteBuffer _buffer;
- private final long _id;
+
@SuppressWarnings("unused")
private volatile int _refCount;
+ @SuppressWarnings("unused")
+ private volatile int _claimed;
+
PooledByteBufferRef(final ByteBuffer buffer)
{
_buffer = buffer;
- _id = BUFFER_ID.getAndIncrement();
ACTIVE_BUFFERS.incrementAndGet();
}
@Override
- public void incrementRef()
+ public void incrementRef(final int capacity)
{
- if(REF_COUNT.get(this) >= 0)
+ if(REF_COUNT_UPDATER.get(this) >= 0)
{
- REF_COUNT.incrementAndGet(this);
+ CLAIMED_UPDATER.addAndGet(this, capacity);
+ REF_COUNT_UPDATER.incrementAndGet(this);
}
}
@Override
- public void decrementRef()
+ public void decrementRef(final int capacity)
{
- if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
+ CLAIMED_UPDATER.addAndGet(this, -capacity);
+ if(REF_COUNT_UPDATER.get(this) > 0 && REF_COUNT_UPDATER.decrementAndGet(this) == 0)
{
QpidByteBuffer.returnToPool(_buffer);
ACTIVE_BUFFERS.decrementAndGet();
@@ -68,24 +73,14 @@ class PooledByteBufferRef implements ByteBufferRef
}
@Override
- public void removeFromPool()
+ public boolean isSparse(final double minimumSparsityFraction)
{
- REF_COUNT.set(this, Integer.MIN_VALUE/2);
+ return minimumSparsityFraction > (double) CLAIMED_UPDATER.get(this) / (double) _buffer.capacity();
}
- @Override
- public long getPooledBufferId()
- {
- return _id;
- }
-
- public static int getActiveBufferCount()
+ static int getActiveBufferCount()
{
return ACTIVE_BUFFERS.get();
}
- public static long getLargestBufferId()
- {
- return BUFFER_ID.get();
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index 203b009..dd2daa3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -58,6 +58,7 @@ public class QpidByteBuffer
private volatile static BufferPool _bufferPool;
private volatile static int _pooledBufferSize;
private volatile static ByteBuffer _zeroed;
+ private volatile static double _sparsityFraction;
private final int _offset;
final ByteBufferRef _ref;
@@ -76,7 +77,7 @@ public class QpidByteBuffer
_ref = ref;
_buffer = buffer;
_offset = offset;
- _ref.incrementRef();
+ _ref.incrementRef(capacity());
}
public final boolean isDirect()
@@ -127,7 +128,7 @@ public class QpidByteBuffer
{
if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
{
- _ref.decrementRef();
+ _ref.decrementRef(capacity());
_buffer = null;
}
}
@@ -542,11 +543,6 @@ public class QpidByteBuffer
return this;
}
- private long getPooledBufferId()
- {
- return _ref.getPooledBufferId();
- }
-
ByteBuffer getUnderlyingBuffer()
{
return _buffer;
@@ -593,11 +589,11 @@ public class QpidByteBuffer
buf.dispose();
}
buf = allocateDirect(_pooledBufferSize);
+ _cachedBuffer.set(buf);
}
QpidByteBuffer rVal = buf.view(0, size);
buf.position(buf.position() + size);
- _cachedBuffer.set(buf);
return rVal;
}
}
@@ -801,15 +797,16 @@ public class QpidByteBuffer
_bufferPool.returnBuffer(buffer);
}
- public synchronized static void initialisePool(int bufferSize, int maxPoolSize)
+ public synchronized static void initialisePool(int bufferSize, int maxPoolSize, final double sparsityFraction)
{
- if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize()))
+ if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize() || sparsityFraction != _sparsityFraction))
{
final String errorMessage = String.format(
- "QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
+ "QpidByteBuffer pool has already been initialised with bufferSize=%d, maxPoolSize=%d, and sparsityFraction=%f." +
"Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
_pooledBufferSize,
_bufferPool.getMaxSize(),
+ _sparsityFraction,
bufferSize,
maxPoolSize);
throw new IllegalStateException(errorMessage);
@@ -822,6 +819,7 @@ public class QpidByteBuffer
_bufferPool = new BufferPool(maxPoolSize);
_pooledBufferSize = bufferSize;
_zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
+ _sparsityFraction = sparsityFraction;
_isPoolInitialized = true;
}
@@ -834,6 +832,7 @@ public class QpidByteBuffer
_pooledBufferSize = -1;
_zeroed = null;
_isPoolInitialized = false;
+ _sparsityFraction = 1.0;
}
}
@@ -857,19 +856,14 @@ public class QpidByteBuffer
return _bufferPool.size();
}
- public static long getLargestPooledBufferId()
- {
- return PooledByteBufferRef.getLargestBufferId();
- }
-
- public static List<QpidByteBuffer> reallocateIfNecessary(final long smallestAllowedBufferId, Collection<QpidByteBuffer> data)
+ public static List<QpidByteBuffer> reallocateIfNecessary(Collection<QpidByteBuffer> data)
{
if (data != null)
{
List<QpidByteBuffer> newCopy = new ArrayList<>(data.size());
for (QpidByteBuffer buf : data)
{
- newCopy.add(reallocateIfNecessary(smallestAllowedBufferId, buf));
+ newCopy.add(reallocateIfNecessary(buf));
}
return newCopy;
}
@@ -879,13 +873,9 @@ public class QpidByteBuffer
}
}
- public static QpidByteBuffer reallocateIfNecessary(final long smallestAllowedBufferId, final QpidByteBuffer data)
+ public static QpidByteBuffer reallocateIfNecessary(final QpidByteBuffer data)
{
- double capacityThreshold = QpidByteBuffer.getPooledBufferSize() * REALLOCATION_CAPACITY_THRESHOLD_FRACTION;
- if (data != null
- && data.isDirect()
- && data.getPooledBufferId() < smallestAllowedBufferId
- && data.remaining() < capacityThreshold)
+ if (data != null && data.isDirect() && data.isSparse())
{
QpidByteBuffer newBuf = allocateDirect(data.remaining());
newBuf.put(data);
@@ -899,6 +889,11 @@ public class QpidByteBuffer
}
}
+ boolean isSparse()
+ {
+ return _ref.isSparse(_sparsityFraction);
+ }
+
private static final class BufferInputStream extends InputStream
{
private final QpidByteBuffer _qpidByteBuffer;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index fe2fb29..abf4bc0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -28,8 +28,8 @@ UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
# use similar number to the broker for similar topic
-FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue direct memory : {0,number,#} kB / {1,number,#.##} kB, Broker direct memory : {2,number,#} kB / {3,number,#.##} kB
-FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue direct memory : {0,number,#} kB / {1,number,#.##} kB, Broker direct memory : {2,number,#} kB / {3,number,#.##} kB
+FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth {0,number,#} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#} MB, threshold {3,number,#.##} MB
+FLOW_TO_DISK_INACTIVE = QUE-1015 : Message flow to disk inactive : Queue : depth {0,number,#} kB, threshold {1,number,#.##} kB / Broker : direct memory used {2,number,#} MB, threshold {3,number,#.##} MB
# 0 - operation name
OPERATION = QUE-1016 : Operation : {0}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 2f583e1..70ecd61 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -259,7 +259,7 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
index 7e22ef9..0d8728b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
@@ -105,7 +105,7 @@ public class InternalMessageMetaData implements StorableMessageMetaData
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 533d6e2..19e6de0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -95,16 +95,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = COMPACT_MEMORY_INTERVAL)
long DEFAULT_COMPACT_MEMORY_INTERVAL = 1000L;
- String MEMORY_OCCUPANCY_THRESHOLD = "qpid.memory_occupancy_threshold";
- @SuppressWarnings("unused")
- @ManagedContextDefault(name = MEMORY_OCCUPANCY_THRESHOLD)
- double DEFAULT_MEMORY_OCCUPANCY_THRESHOLD = 0.5;
-
- String MEMORY_COMPACTION_INCREMENT = "qpid.memory_compaction_increment";
- @SuppressWarnings("unused")
- @ManagedContextDefault(name = MEMORY_COMPACTION_INCREMENT)
- long DEFAULT_MEMORY_COMPACTION_INCREMENT = 100;
-
@ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)
long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l;
@@ -196,6 +186,10 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE)
int DEFAULT_BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE = 1024;
+ String BROKER_DIRECT_BYTE_BUFFER_POOL_SPARSITY_REALLOCATION_FRACTION = "broker.directByteBufferPoolSparsityReallocationFraction";
+ @ManagedContextDefault(name = BROKER_DIRECT_BYTE_BUFFER_POOL_SPARSITY_REALLOCATION_FRACTION)
+ double DEFAULT_BROKER_DIRECT_BYTE_BUFFER_POOL_SPARSITY_REALLOCATION_FRACTION = 0.5;
+
@ManagedAttribute(validValues = {"org.apache.qpid.server.model.BrokerImpl#getAvailableConfigurationEncrypters()"})
String getConfidentialConfigurationEncryptionProvider();
@@ -374,17 +368,14 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start flowing incoming messages to disk.")
long getFlowToDiskThreshold();
- @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable. See also " + MEMORY_OCCUPANCY_THRESHOLD)
+ @DerivedAttribute(description = "Threshold direct memory size (in bytes) at which the Broker will start considering to compact sparse buffers. Set to -1 to disable.")
long getCompactMemoryThreshold();
@DerivedAttribute(description = "Time interval (in milliseconds) between runs of the memory compactor check. See also " + COMPACT_MEMORY_THRESHOLD)
long getCompactMemoryInterval();
- @DerivedAttribute(description = "Occupancy threshold (fraction) at which point buffers will be compacted. See also " + COMPACT_MEMORY_THRESHOLD)
- double getMemoryOccupancyThreshold();
-
- @DerivedAttribute(description = "Approximate number of buffers that will be compacted on each compaction run. See also " + COMPACT_MEMORY_THRESHOLD)
- long getMemoryCompactionIncrement();
+ @DerivedAttribute(description = "Minimum fraction of direct memory buffer that can be occupied before the buffer is considered for compaction")
+ double getSparsityFraction();
@ManagedOperation(changesConfiguredObjectState = false, nonModifying = true,
description = "Force direct memory buffer compaction.")
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
index 65d28f8..58587bd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
@@ -144,12 +144,10 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private final AccessControl _accessControl;
private TaskExecutor _preferenceTaskExecutor;
private String _documentationUrl;
- private volatile long _smallestAllowedBufferId = QpidByteBuffer.getLargestPooledBufferId();
private long _compactMemoryThreshold;
private long _compactMemoryInterval;
- private double _memoryOccupancyThreshold;
- private long _memoryCompactionIncrement;
private long _flowToDiskThreshold;
+ private double _sparsityFraction;
@ManagedObjectFactoryConstructor
public BrokerImpl(Map<String, Object> attributes,
@@ -207,9 +205,10 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
_networkBufferSize = networkBufferSize;
+ _sparsityFraction = getContextValue(Double.class, BROKER_DIRECT_BYTE_BUFFER_POOL_SPARSITY_REALLOCATION_FRACTION);
int poolSize = getContextValue(Integer.class, BROKER_DIRECT_BYTE_BUFFER_POOL_SIZE);
- QpidByteBuffer.initialisePool(_networkBufferSize, poolSize);
+ QpidByteBuffer.initialisePool(_networkBufferSize, poolSize, _sparsityFraction);
}
@Override
@@ -438,7 +437,7 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
private void checkDirectMemoryUsage()
{
- if (_compactMemoryThreshold >= 0 && getUsedDirectMemorySize() > _compactMemoryThreshold)
+ if (_compactMemoryThreshold >= 0 && QpidByteBuffer.getAllocatedDirectMemorySize() > _compactMemoryThreshold)
{
compactMemory();
}
@@ -581,8 +580,6 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
_flowToDiskThreshold = getContextValue(Long.class, BROKER_FLOW_TO_DISK_THRESHOLD);
_compactMemoryThreshold = getContextValue(Long.class, Broker.COMPACT_MEMORY_THRESHOLD);
_compactMemoryInterval = getContextValue(Long.class, Broker.COMPACT_MEMORY_INTERVAL);
- _memoryOccupancyThreshold = getContextValue(Double.class, Broker.MEMORY_OCCUPANCY_THRESHOLD);
- _memoryCompactionIncrement = getContextValue(Long.class, Broker.MEMORY_COMPACTION_INCREMENT);
if (SystemUtils.getProcessPid() != null)
{
@@ -1175,86 +1172,47 @@ public class BrokerImpl extends AbstractContainer<BrokerImpl> implements Broker<
}
@Override
- public double getMemoryOccupancyThreshold()
+ public double getSparsityFraction()
{
- return _memoryOccupancyThreshold;
- }
-
- @Override
- public long getMemoryCompactionIncrement()
- {
- return _memoryCompactionIncrement;
+ return _sparsityFraction;
}
@Override
public void compactMemory()
{
- long memOccupiedByMessages = getMemOccupiedByMessages();
- double ratio = memOccupiedByMessages / (double) QpidByteBuffer.getAllocatedDirectMemorySize();
+ LOGGER.debug("Compacting direct memory buffers: numberOfActivePooledBuffers: {}",
+ QpidByteBuffer.getNumberOfActivePooledBuffers());
- if (ratio < _memoryOccupancyThreshold)
+ final Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+ List<ListenableFuture<Void>> futures = new ArrayList<>(vhns.size());
+ for (VirtualHostNode<?> vhn : vhns)
{
- int numberOfActivePooledBuffers = QpidByteBuffer.getNumberOfActivePooledBuffers();
- LOGGER.debug("Compacting direct memory buffers: "
- + "memOccupiedByMessages: {}, numberOfActivePooledBuffers: {}, ratio: {}",
- memOccupiedByMessages, numberOfActivePooledBuffers, ratio);
-
- long largestBufferId = QpidByteBuffer.getLargestPooledBufferId();
- _smallestAllowedBufferId = Math.min(largestBufferId,
- _smallestAllowedBufferId + _memoryCompactionIncrement);
-
- final Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
- List<ListenableFuture<Void>> futures = new ArrayList<>(vhns.size());
- for (VirtualHostNode<?> vhn : vhns)
+ VirtualHost<?> vh = vhn.getVirtualHost();
+ if (vh instanceof QueueManagingVirtualHost)
{
- VirtualHost<?> vh = vhn.getVirtualHost();
- if (vh instanceof QueueManagingVirtualHost)
- {
- ListenableFuture<Void> future = ((QueueManagingVirtualHost) vh).reallocateMessages(
- _smallestAllowedBufferId);
- futures.add(future);
- }
+ ListenableFuture<Void> future = ((QueueManagingVirtualHost) vh).reallocateMessages();
+ futures.add(future);
}
- final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(futures);
- addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
+ }
+ final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(futures);
+ addFutureCallback(combinedFuture, new FutureCallback<List<Void>>()
+ {
+ @Override
+ public void onSuccess(final List<Void> result)
{
- @Override
- public void onSuccess(final List<Void> result)
- {
- if (LOGGER.isDebugEnabled())
- {
- long memOccupiedByMessages = getMemOccupiedByMessages();
- double ratio = memOccupiedByMessages / (double) QpidByteBuffer.getAllocatedDirectMemorySize();
- LOGGER.debug("After compact direct memory buffers: numberOfActivePooledBuffers: {}, ratio: {}",
- QpidByteBuffer.getNumberOfActivePooledBuffers(),
- ratio);
- }
- }
-
- @Override
- public void onFailure(final Throwable t)
+ if (LOGGER.isDebugEnabled())
{
- LOGGER.warn("Unexpected error during direct memory compaction.", t);
+ LOGGER.debug("After compact direct memory buffers: numberOfActivePooledBuffers: {}",
+ QpidByteBuffer.getNumberOfActivePooledBuffers());
}
- }, _houseKeepingTaskExecutor);
- }
- }
-
- private long getMemOccupiedByMessages()
- {
- final Collection<VirtualHostNode<?>> vhns = getVirtualHostNodes();
+ }
- long memOccupiedByMessages = 0;
- for (VirtualHostNode<?> vhn : vhns)
- {
- VirtualHost<?> vh = vhn.getVirtualHost();
- if (vh instanceof QueueManagingVirtualHost)
+ @Override
+ public void onFailure(final Throwable t)
{
- QueueManagingVirtualHost<?> host = (QueueManagingVirtualHost<?>) vh;
- memOccupiedByMessages += host.getTotalDepthOfQueuesBytesIncludingHeader();
+ LOGGER.warn("Unexpected error during direct memory compaction.", t);
}
- }
- return memOccupiedByMessages;
+ }, _houseKeepingTaskExecutor);
}
private class AddressSpaceRegistry implements SystemAddressSpaceCreator.AddressSpaceRegistry
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 89a687e..f2cbc25 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -461,7 +461,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
*/
void checkMessageStatus();
- void reallocateMessages(long smallestAllowedBufferId);
+ void reallocateMessages();
Set<NotificationCheck> getNotificationChecks();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 63c4b10..c8ba379 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -962,9 +962,9 @@ public class FieldTable
}
}
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
- _encodedForm = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encodedForm);
+ _encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index bf5d11b..a824438 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -2112,7 +2112,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public void reallocateMessages(final long smallestAllowedBufferId)
+ public void reallocateMessages()
{
QueueEntryIterator queueListIterator = getEntries().iterator();
@@ -2127,7 +2127,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final MessageReference messageReference = message.newReference();
try
{
- message.getStoredMessage().reallocate(smallestAllowedBufferId);
+ message.getStoredMessage().reallocate();
}
finally
{
@@ -3374,8 +3374,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_ACTIVE(estimatedQueueSize / 1024,
targetQueueSize / 1024,
- allocatedDirectMemorySize / 1024,
- flowToDiskThreshold / 1024));
+ allocatedDirectMemorySize / 1024 / 1024,
+ flowToDiskThreshold / 1024 / 1024));
}
}
@@ -3388,8 +3388,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
getEventLogger().message(_logSubject, QueueMessages.FLOW_TO_DISK_INACTIVE(estimatedQueueSize / 1024,
targetQueueSize / 1024,
- allocatedDirectMemorySize / 1024,
- flowToDiskThreshold / 1024));
+ allocatedDirectMemorySize / 1024 / 1024,
+ flowToDiskThreshold / 1024 / 1024));
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
index 83747fd..4c4071d 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
@@ -39,6 +39,6 @@ public interface StorableMessageMetaData
void clearEncodedForm();
- void reallocate(final long smallestAllowedBufferId);
+ void reallocate();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index dc54fbc..d0fd927 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -142,14 +142,14 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
}
@Override
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
- _metaData.reallocate(smallestAllowedBufferId);
+ _metaData.reallocate();
List<QpidByteBuffer> newContent = new ArrayList<>(_content.size());
for (Iterator<QpidByteBuffer> iterator = _content.iterator(); iterator.hasNext(); )
{
final QpidByteBuffer buffer = iterator.next();
- newContent.add(QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, buffer));
+ newContent.add(QpidByteBuffer.reallocateIfNecessary(buffer));
iterator.remove();
}
_content.addAll(newContent);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 1a23edd..fbaf0c0 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -40,5 +40,5 @@ public interface StoredMessage<M extends StorableMessageMetaData>
boolean flowToDisk();
- void reallocate(final long smallestAllowedBufferId);
+ void reallocate();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 394cd32..3e271d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1402,7 +1402,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- public ListenableFuture<Void> reallocateMessages(final long smallestAllowedBufferId)
+ public ListenableFuture<Void> reallocateMessages()
{
final Future future = _houseKeepingTaskExecutor.submit(() ->
{
@@ -1412,8 +1412,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
if (q.getState() == State.ACTIVE)
{
- q.reallocateMessages(
- smallestAllowedBufferId);
+ q.reallocateMessages();
}
}
});
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 8b30d2e..a47ab37 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -283,7 +283,7 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
MessageDestination getSystemDestination(String name);
- ListenableFuture<Void> reallocateMessages(long smallestAllowedBufferId);
+ ListenableFuture<Void> reallocateMessages();
interface Transaction
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
index 6377a3b..139a4e9 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/bytebuffer/QpidByteBufferTest.java
@@ -45,6 +45,7 @@ public class QpidByteBufferTest extends QpidTestCase
{
private static final int BUFFER_SIZE = 10;
private static final int POOL_SIZE = 20;
+ private static final double SPARSITY_FRACTION = 0.5;
private QpidByteBuffer _slicedBuffer;
@@ -55,7 +56,7 @@ public class QpidByteBufferTest extends QpidTestCase
{
super.setUp();
QpidByteBuffer.deinitialisePool();
- QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE);
+ QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
_parent = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
}
@@ -784,7 +785,7 @@ public class QpidByteBufferTest extends QpidTestCase
{
try
{
- QpidByteBuffer.initialisePool(BUFFER_SIZE + 1, POOL_SIZE + 1);
+ QpidByteBuffer.initialisePool(BUFFER_SIZE + 1, POOL_SIZE + 1, SPARSITY_FRACTION);
fail("It is not legal to initialize buffer twice with different settings.");
}
catch (IllegalStateException e)
@@ -883,6 +884,21 @@ public class QpidByteBufferTest extends QpidTestCase
viewWithOffset.dispose();
}
+ public void testSparsity()
+ {
+ assertFalse("Unexpected sparsity after creation", _parent.isSparse());
+ QpidByteBuffer child = _parent.view(0, 6);
+ QpidByteBuffer grandChild = child.view(0, 2);
+
+ assertFalse("Unexpected sparsity after child creation", _parent.isSparse());
+ _parent.dispose();
+
+ assertFalse("Unexpected sparsity after parent disposal", child.isSparse());
+
+ child.dispose();
+ assertTrue("Buffer should be sparse", grandChild.isSparse());
+ }
+
private void doDeflateInflate(byte[] input,
Collection<QpidByteBuffer> inputBufs,
boolean direct) throws IOException
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/EncodingUtilsTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/EncodingUtilsTest.java b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/EncodingUtilsTest.java
index 67669c3..c87f929 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/EncodingUtilsTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/protocol/v0_8/EncodingUtilsTest.java
@@ -28,6 +28,7 @@ public class EncodingUtilsTest extends QpidTestCase
{
private static final int BUFFER_SIZE = 10;
private static final int POOL_SIZE = 20;
+ private static final double SPARSITY_FRACTION = 1.0;
private QpidByteBuffer _buffer;
@@ -36,7 +37,7 @@ public class EncodingUtilsTest extends QpidTestCase
{
super.setUp();
QpidByteBuffer.deinitialisePool();
- QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE);
+ QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE, SPARSITY_FRACTION);
_buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
index 964f459..bf5cdb8 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
@@ -85,7 +85,7 @@ public class TestMessageMetaData implements StorableMessageMetaData
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 8e5867b..05ff4cd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -122,7 +122,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 695ea03..4e9926c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -110,7 +110,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index d756173..031ae17 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -195,9 +195,9 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData
}
@Override
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
- _encoded = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encoded);
+ _encoded = QpidByteBuffer.reallocateIfNecessary(_encoded);
}
public String getRoutingKey()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index c9faf8e..dcf2f4b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -123,7 +123,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index ce17beb..108f52c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -141,9 +141,9 @@ public class MessageMetaData implements StorableMessageMetaData
}
@Override
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
- _contentHeaderBody.reallocate(smallestAllowedBufferId);
+ _contentHeaderBody.reallocate();
}
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 00f9ac1..59e65b7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -969,12 +969,12 @@ public class BasicContentHeaderProperties
}
}
- synchronized void reallocate(final long smallestAllowedBufferId)
+ synchronized void reallocate()
{
- _encodedForm = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encodedForm);
+ _encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
if (_headers != null)
{
- _headers.reallocate(smallestAllowedBufferId);
+ _headers.reallocate();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
index 08f5d4e..4cef06c 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/ContentHeaderBody.java
@@ -195,8 +195,8 @@ public class ContentHeaderBody implements AMQBody
_properties.clearEncodedForm();
}
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
- _properties.reallocate(smallestAllowedBufferId);
+ _properties.reallocate();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 6fd1417..e2fde5a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -272,7 +272,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 9f2ad67..2055b0d 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -306,31 +306,31 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
if (_headerSection != null)
{
- _headerSection.reallocate(smallestAllowedBufferId);
+ _headerSection.reallocate();
}
if (_deliveryAnnotationsSection != null)
{
- _deliveryAnnotationsSection.reallocate(smallestAllowedBufferId);
+ _deliveryAnnotationsSection.reallocate();
}
if (_messageAnnotationsSection != null)
{
- _messageAnnotationsSection.reallocate(smallestAllowedBufferId);
+ _messageAnnotationsSection.reallocate();
}
if (_propertiesSection != null)
{
- _propertiesSection.reallocate(smallestAllowedBufferId);
+ _propertiesSection.reallocate();
}
if (_applicationPropertiesSection != null)
{
- _applicationPropertiesSection.reallocate(smallestAllowedBufferId);
+ _applicationPropertiesSection.reallocate();
}
if (_footerSection != null)
{
- _footerSection.reallocate(smallestAllowedBufferId);
+ _footerSection.reallocate();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
index 768d500..81a0853 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AbstractSection.java
@@ -95,9 +95,9 @@ public abstract class AbstractSection<T, S extends NonEncodingRetainingSection<T
}
@Override
- public synchronized final void reallocate(final long smallestAllowedBufferId)
+ public synchronized final void reallocate()
{
- _encodedForm = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _encodedForm);
+ _encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
index 23c67b2..2b7c918 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/EncodingRetaining.java
@@ -29,7 +29,7 @@ public interface EncodingRetaining
void setEncodedForm(List<QpidByteBuffer> encodedForm);
List<QpidByteBuffer> getEncodedForm();
void dispose();
- void reallocate(final long smallestAllowedBufferId);
+ void reallocate();
long getEncodedSize();
void writeTo(QpidByteBuffer dest);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index b58b821..602aab5 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -130,7 +130,7 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 1097013..1558caa 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -232,7 +232,7 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 203a9d9..b298e9f 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -118,7 +118,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 25fe5de..2b2f6fa 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -129,7 +129,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/1424c408/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index ff7e2ad..918e193 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1330,7 +1330,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
Collection<QpidByteBuffer> getData();
void setData(Collection<QpidByteBuffer> data);
boolean isHardRef();
- void reallocate(final long smallestAllowedBufferId);
+ void reallocate();
}
private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
@@ -1368,13 +1368,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
if(_metaData != null)
{
- _metaData.reallocate(smallestAllowedBufferId);
+ _metaData.reallocate();
}
- _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+ _data = QpidByteBuffer.reallocateIfNecessary(_data);
}
}
@@ -1432,14 +1432,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public void reallocate(final long smallestAllowedBufferId)
+ public void reallocate()
{
if(_metaData != null)
{
- _metaData.reallocate(smallestAllowedBufferId);
+ _metaData.reallocate();
}
- _data = QpidByteBuffer.reallocateIfNecessary(smallestAllowedBufferId, _data);
+ _data = QpidByteBuffer.reallocateIfNecessary(_data);
}
}
@@ -1709,11 +1709,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public synchronized void reallocate(final long smallestAllowedBufferId)
+ public synchronized void reallocate()
{
if(_messageDataRef != null)
{
- _messageDataRef.reallocate(smallestAllowedBufferId);
+ _messageDataRef.reallocate();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org