You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/16 17:06:01 UTC
[1/2] qpid-broker-j git commit: QPID-7784: [Java Broker] Dispose
QpidByteBuffers associated with pooled threads when shutting down executors.
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 9345d2d79 -> b63815ceb
QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d9af2660
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d9af2660
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d9af2660
Branch: refs/heads/master
Commit: d9af2660089139e2f4fdad8c0aa0e0c8e6529ff5
Parents: 9345d2d
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 16 16:45:50 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 16 17:28:39 2017 +0100
----------------------------------------------------------------------
.../qpid/server/bytebuffer/QpidByteBuffer.java | 9 ++++++
.../transport/NetworkConnectionScheduler.java | 34 +++++++++++++++++++-
.../qpid/server/util/HousekeepingExecutor.java | 25 ++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index f7a1905..8f1fd5a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -843,6 +843,15 @@ public class QpidByteBuffer implements AutoCloseable
}
}
+ /**
+ * Not for general use!
+ * Used to clear threadlocal buffer when shutting down thread pools.
+ */
+ public static QpidByteBuffer getCachedThreadLocalBuffer()
+ {
+ return _cachedBuffer.get();
+ }
+
public static int getPooledBufferSize()
{
return _pooledBufferSize;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index f419a83..729750f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.transport;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -32,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+
public class NetworkConnectionScheduler
{
private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -99,7 +103,35 @@ public class NetworkConnectionScheduler
_selectorThread = new SelectorThread(this, _numberOfSelectors);
_executor = new ThreadPoolExecutor(_poolSize, _poolSize,
_threadKeepAliveTimeout, TimeUnit.MINUTES,
- new LinkedBlockingQueue<Runnable>(), _factory);
+ new LinkedBlockingQueue<Runnable>(), _factory)
+ {
+ private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+ @Override
+ protected void afterExecute(final Runnable r, final Throwable t)
+ {
+ super.afterExecute(r, t);
+ final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+ if (cachedThreadLocalBuffer != null)
+ {
+ _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+ }
+ else
+ {
+ _cachedBufferMap.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ protected void terminated()
+ {
+ super.terminated();
+ for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+ {
+ qpidByteBuffer.dispose();
+ }
+ }
+ };
_executor.prestartAllCoreThreads();
_executor.allowCoreThreadTimeOut(true);
for(int i = 0 ; i < _poolSize; i++)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
index 75d1179..a9360d2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.util;
+import java.util.Map;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,12 +33,14 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
{
private static final Logger LOGGER = LoggerFactory.getLogger(HousekeepingExecutor.class);
+ private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
public HousekeepingExecutor(final String threadPrefix, final int threadCount, final Subject subject)
{
@@ -53,6 +57,17 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
+
+ final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+ if (cachedThreadLocalBuffer != null)
+ {
+ _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+ }
+ else
+ {
+ _cachedBufferMap.remove(Thread.currentThread());
+ }
+
if (t == null && r instanceof Future<?>)
{
Future future = (Future<?>) r;
@@ -96,4 +111,14 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
}
}
}
+
+ @Override
+ protected void terminated()
+ {
+ super.terminated();
+ for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+ {
+ qpidByteBuffer.dispose();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of
QpidByteBuffers associated with message content/headers when stopping/closing
a VirtualHost
Posted by kw...@apache.org.
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b63815ce
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b63815ce
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b63815ce
Branch: refs/heads/master
Commit: b63815ceb13127a1b84266462aefe2c103adc34c
Parents: d9af266
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 16 17:04:40 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 16 17:32:08 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 144 +++++++------------
.../qpid/server/store/MemoryMessageStore.java | 4 +
.../qpid/server/store/StoredMemoryMessage.java | 5 +
.../store/jdbc/AbstractJDBCMessageStore.java | 144 +++++++------------
4 files changed, 108 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6f1326e..cef6549 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ListenableFuture;
@@ -103,6 +105,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private final Random _lockConflictRandom = new Random();
private final AtomicLong _inMemorySize = new AtomicLong();
private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+ private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void upgradeStoreStructure() throws StoreException
@@ -126,7 +129,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long newMessageId = getNextMessageId();
- return new StoredBDBMessage<T>(newMessageId, metaData);
+ return createStoredBDBMessage(newMessageId, metaData, false);
+ }
+
+ public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final long newMessageId,
+ final T metaData,
+ final boolean recovered)
+ {
+ final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId, metaData, recovered);
+ _messages.add(message);
+ return message;
}
public long getNextMessageId()
@@ -184,6 +196,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredBDBMessage<?> message : _messages)
+ {
+ message.clear();
+ }
+ _messages.clear();
_inMemorySize.set(0);
_bytesEvacuatedFromMemory.set(0);
}
@@ -444,7 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
long messageId = LongBinding.entryToLong(key);
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
@@ -490,7 +507,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
{
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
return message;
}
else
@@ -917,50 +934,49 @@ public abstract class AbstractBDBMessageStore implements MessageStore
protected abstract Logger getLogger();
- interface MessageDataRef<T extends StorableMessageMetaData>
+ private static class MessageDataRef<T extends StorableMessageMetaData>
{
- T getMetaData();
- Collection<QpidByteBuffer> getData();
- void setData(Collection<QpidByteBuffer> data);
- boolean isHardRef();
- void reallocate();
- }
-
- private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
- {
- private final T _metaData;
+ private volatile T _metaData;
private volatile Collection<QpidByteBuffer> _data;
+ private volatile boolean _isHardRef;
+
+ private MessageDataRef(final T metaData, boolean isHardRef)
+ {
+ this(metaData, null, isHardRef);
+ }
- private MessageDataHardRef(final T metaData)
+ private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
{
_metaData = metaData;
+ _data = data;
+ _isHardRef = isHardRef;
}
- @Override
public T getMetaData()
{
return _metaData;
}
- @Override
public Collection<QpidByteBuffer> getData()
{
return _data;
}
- @Override
public void setData(final Collection<QpidByteBuffer> data)
{
_data = data;
}
- @Override
public boolean isHardRef()
{
- return true;
+ return _isHardRef;
+ }
+
+ public void setSoft()
+ {
+ _isHardRef = false;
}
- @Override
public void reallocate()
{
if(_metaData != null)
@@ -969,37 +985,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- }
-
- private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
- {
-
- private T _metaData;
- private volatile Collection<QpidByteBuffer> _data;
-
- private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
- {
- _metaData = metaData;
- _data = data;
- }
-
- @Override
- public T getMetaData()
- {
- return _metaData;
- }
-
- @Override
- public Collection<QpidByteBuffer> getData()
- {
- return _data;
- }
-
- @Override
- public void setData(final Collection<QpidByteBuffer> data)
- {
- _data = data;
- }
public long clear()
{
@@ -1021,22 +1006,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
return bytesCleared;
}
-
- @Override
- public boolean isHardRef()
- {
- return false;
- }
-
- @Override
- public void reallocate()
- {
- if(_metaData != null)
- {
- _metaData.reallocate();
- }
- _data = QpidByteBuffer.reallocateIfNecessary(_data);
- }
}
final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1046,23 +1015,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private final int _contentSize;
private MessageDataRef<T> _messageDataRef;
- StoredBDBMessage(long messageId, T metaData)
- {
- this(messageId, metaData, false);
- }
-
StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
{
_messageId = messageId;
- if(!isRecovered)
- {
- _messageDataRef = new MessageDataHardRef<>(metaData);
- }
- else
- {
- _messageDataRef = new MessageDataSoftRef<>(metaData, null);
- }
+ _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
_contentSize = metaData.getContentSize();
_inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1083,7 +1040,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
checkMessageStoreOpen();
metaData = (T) getMessageMetaData(_messageId);
- _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+ _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(), false);
}
return metaData;
}
@@ -1200,22 +1157,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if (!stored())
{
-
AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData());
AbstractBDBMessageStore.this.addContent(txn, _messageId,
_messageDataRef.getData() == null
? Collections.<QpidByteBuffer>emptySet()
: _messageDataRef.getData());
-
-
- MessageDataRef<T> hardRef = _messageDataRef;
- MessageDataSoftRef<T> messageDataSoftRef;
-
- messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
-
- _messageDataRef = messageDataSoftRef;
-
-
+ _messageDataRef.setSoft();
}
}
@@ -1247,6 +1194,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public synchronized void remove()
{
checkMessageStoreOpen();
+ _messages.remove(this);
if(stored())
{
removeMessage(_messageId, false);
@@ -1293,7 +1241,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ final long bytesCleared = _messageDataRef.clear();
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1314,6 +1262,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_messageDataRef.reallocate();
}
}
+
+ public synchronized void clear()
+ {
+ if (_messageDataRef != null)
+ {
+ _messageDataRef.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 22020c0..0da7db7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -345,6 +345,10 @@ public class MemoryMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+ {
+ storedMemoryMessage.clear();
+ }
_messages.clear();
_inMemorySize.set(0);
synchronized (_transactionLock)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index d0fd927..6d6921c 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -154,4 +154,9 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
}
_content.addAll(newContent);
}
+
+ public void clear()
+ {
+ remove();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 277810d..921f1fb 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -33,7 +33,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -86,6 +88,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private String _tablePrefix = "";
private final AtomicLong _inMemorySize = new AtomicLong();
private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+ private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected abstract boolean isMessageStoreOpen();
@@ -247,6 +250,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public void closeMessageStore()
{
+ for (StoredJDBCMessage<?> message : _messages)
+ {
+ message.clear();
+ }
+ _messages.clear();
_inMemorySize.set(0);
_bytesEvacuatedFromMemory.set(0);
if(_executor != null)
@@ -415,8 +423,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
- return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+ return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+ }
+ public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final long newMessageId,
+ final T metaData,
+ final boolean recovered)
+ {
+ final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId, metaData, recovered);
+ _messages.add(message);
+ return message;
}
@Override
@@ -1339,50 +1355,49 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- static interface MessageDataRef<T extends StorableMessageMetaData>
+ private static class MessageDataRef<T extends StorableMessageMetaData>
{
- T getMetaData();
- Collection<QpidByteBuffer> getData();
- void setData(Collection<QpidByteBuffer> data);
- boolean isHardRef();
- void reallocate();
- }
-
- private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
- {
- private final T _metaData;
+ private volatile T _metaData;
private volatile Collection<QpidByteBuffer> _data;
+ private volatile boolean _isHardRef;
- private MessageDataHardRef(final T metaData)
+ private MessageDataRef(final T metaData, boolean isHardRef)
+ {
+ this(metaData, null, isHardRef);
+ }
+
+ private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
{
_metaData = metaData;
+ _data = data;
+ _isHardRef = isHardRef;
}
- @Override
public T getMetaData()
{
return _metaData;
}
- @Override
public Collection<QpidByteBuffer> getData()
{
return _data;
}
- @Override
public void setData(final Collection<QpidByteBuffer> data)
{
_data = data;
}
- @Override
public boolean isHardRef()
{
- return true;
+ return _isHardRef;
+ }
+
+ public void setSoft()
+ {
+ _isHardRef = false;
}
- @Override
public void reallocate()
{
if(_metaData != null)
@@ -1391,37 +1406,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- }
-
- private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
- {
-
- private T _metaData;
- private volatile Collection<QpidByteBuffer> _data;
-
- private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
- {
- _metaData = metaData;
- _data = data;
- }
-
- @Override
- public T getMetaData()
- {
- return _metaData;
- }
-
- @Override
- public Collection<QpidByteBuffer> getData()
- {
- return _data;
- }
-
- @Override
- public void setData(final Collection<QpidByteBuffer> data)
- {
- _data = data;
- }
public long clear()
{
@@ -1443,23 +1427,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
return bytesCleared;
}
-
- @Override
- public boolean isHardRef()
- {
- return false;
- }
-
- @Override
- public void reallocate()
- {
- if(_metaData != null)
- {
- _metaData.reallocate();
- }
-
- _data = QpidByteBuffer.reallocateIfNecessary(_data);
- }
}
private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1470,26 +1437,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private MessageDataRef<T> _messageDataRef;
-
- StoredJDBCMessage(long messageId, T metaData)
- {
- this(messageId, metaData, false);
- }
-
-
StoredJDBCMessage(long messageId,
T metaData, boolean isRecovered)
{
_messageId = messageId;
- if(!isRecovered)
- {
- _messageDataRef = new MessageDataHardRef<>(metaData);
- }
- else
- {
- _messageDataRef = new MessageDataSoftRef<>(metaData, null);
- }
+ _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
_contentSize = metaData.getContentSize();
_inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1513,7 +1466,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
try
{
metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
- _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+ _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(), false);
}
catch (SQLException e)
{
@@ -1635,7 +1588,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
if (!stored())
{
-
AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
@@ -1644,14 +1596,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
getLogger().debug("Storing message {} to store", _messageId);
- MessageDataRef<T> hardRef = _messageDataRef;
- MessageDataSoftRef<T> messageDataSoftRef;
-
- messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
-
- _messageDataRef = messageDataSoftRef;
-
-
+ _messageDataRef.setSoft();
}
}
@@ -1683,6 +1628,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
getLogger().debug("REMOVE called on message: {}", _messageId);
checkMessageStoreOpen();
+ _messages.remove(this);
if(stored())
{
AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1729,7 +1675,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+ final long bytesCleared = _messageDataRef.clear();
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1745,6 +1691,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
+ public synchronized void clear()
+ {
+ if (_messageDataRef != null)
+ {
+ _messageDataRef.clear();
+ }
+ }
+
@Override
public String toString()
{
@@ -1792,7 +1746,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
StorableMessageMetaData metaData = type.createMetaData(buf);
buf.dispose();
- message = new StoredJDBCMessage(messageId, metaData, true);
+ message = createStoredJDBCMessage(messageId, metaData, true);
}
else
@@ -1845,7 +1799,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0]) &0xff);
StorableMessageMetaData metaData = type.createMetaData(buf);
buf.dispose();
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org