You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/05/16 17:06:01 UTC

[1/2] qpid-broker-j git commit: QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 9345d2d79 -> b63815ceb


QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d9af2660
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d9af2660
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d9af2660

Branch: refs/heads/master
Commit: d9af2660089139e2f4fdad8c0aa0e0c8e6529ff5
Parents: 9345d2d
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 16 16:45:50 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 16 17:28:39 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/bytebuffer/QpidByteBuffer.java  |  9 ++++++
 .../transport/NetworkConnectionScheduler.java   | 34 +++++++++++++++++++-
 .../qpid/server/util/HousekeepingExecutor.java  | 25 ++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
index f7a1905..8f1fd5a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/QpidByteBuffer.java
@@ -843,6 +843,15 @@ public class QpidByteBuffer implements AutoCloseable
         }
     }
 
+    /**
+     * Not for general use!
+     * Used to clear threadlocal buffer when shutting down thread pools.
+     */
+    public static QpidByteBuffer getCachedThreadLocalBuffer()
+    {
+        return _cachedBuffer.get();
+    }
+
     public static int getPooledBufferSize()
     {
         return _pooledBufferSize;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index f419a83..729750f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.transport;
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -32,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+
 public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -99,7 +103,35 @@ public class NetworkConnectionScheduler
             _selectorThread = new SelectorThread(this, _numberOfSelectors);
             _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
                                                _threadKeepAliveTimeout, TimeUnit.MINUTES,
-                                               new LinkedBlockingQueue<Runnable>(), _factory);
+                                               new LinkedBlockingQueue<Runnable>(), _factory)
+            {
+                private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+                @Override
+                protected void afterExecute(final Runnable r, final Throwable t)
+                {
+                    super.afterExecute(r, t);
+                    final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+                    if (cachedThreadLocalBuffer != null)
+                    {
+                        _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+                    }
+                    else
+                    {
+                        _cachedBufferMap.remove(Thread.currentThread());
+                    }
+                }
+
+                @Override
+                protected void terminated()
+                {
+                    super.terminated();
+                    for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+                    {
+                        qpidByteBuffer.dispose();
+                    }
+                }
+            };
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
             for(int i = 0 ; i < _poolSize; i++)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d9af2660/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
index 75d1179..a9360d2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.util;
 
+import java.util.Map;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,12 +33,14 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
 
 public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HousekeepingExecutor.class);
+    private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
 
     public HousekeepingExecutor(final String threadPrefix, final int threadCount, final Subject subject)
     {
@@ -53,6 +57,17 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
     protected void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r, t);
+
+        final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+        if (cachedThreadLocalBuffer != null)
+        {
+            _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+        }
+        else
+        {
+            _cachedBufferMap.remove(Thread.currentThread());
+        }
+
         if (t == null && r instanceof Future<?>)
         {
             Future future = (Future<?>) r;
@@ -96,4 +111,14 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
             }
         }
     }
+
+    @Override
+    protected void terminated()
+    {
+        super.terminated();
+        for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+        {
+            qpidByteBuffer.dispose();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost

Posted by kw...@apache.org.
QPID-7783: [Java Broker] Dispose of QpidByteBuffers associated with message content/headers when stopping/closing a VirtualHost


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b63815ce
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b63815ce
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b63815ce

Branch: refs/heads/master
Commit: b63815ceb13127a1b84266462aefe2c103adc34c
Parents: d9af266
Author: Lorenz Quack <lq...@apache.org>
Authored: Tue May 16 17:04:40 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 16 17:32:08 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     | 144 +++++++------------
 .../qpid/server/store/MemoryMessageStore.java   |   4 +
 .../qpid/server/store/StoredMemoryMessage.java  |   5 +
 .../store/jdbc/AbstractJDBCMessageStore.java    | 144 +++++++------------
 4 files changed, 108 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6f1326e..cef6549 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -29,7 +29,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -103,6 +105,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private final Random _lockConflictRandom = new Random();
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -126,7 +129,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
         long newMessageId = getNextMessageId();
 
-        return new StoredBDBMessage<T>(newMessageId, metaData);
+        return createStoredBDBMessage(newMessageId, metaData, false);
+    }
+
+    public <T extends StorableMessageMetaData> StoredBDBMessage<T> createStoredBDBMessage(final long newMessageId,
+                                                                                          final T metaData,
+                                                                                          final boolean recovered)
+    {
+        final StoredBDBMessage<T> message = new StoredBDBMessage<>(newMessageId, metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     public long getNextMessageId()
@@ -184,6 +196,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredBDBMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _inMemorySize.set(0);
         _bytesEvacuatedFromMemory.set(0);
     }
@@ -444,7 +461,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             {
                 long messageId = LongBinding.entryToLong(key);
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 if (!handler.handle(message))
                 {
                     break;
@@ -490,7 +507,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
             {
                 StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                StoredBDBMessage message = createStoredBDBMessage(messageId, metaData, true);
                 return message;
             }
             else
@@ -917,50 +934,49 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
     protected abstract Logger getLogger();
 
-    interface MessageDataRef<T extends StorableMessageMetaData>
+    private static class MessageDataRef<T extends StorableMessageMetaData>
     {
-        T getMetaData();
-        Collection<QpidByteBuffer> getData();
-        void setData(Collection<QpidByteBuffer> data);
-        boolean isHardRef();
-        void reallocate();
-    }
-
-    private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
-    {
-        private final T _metaData;
+        private volatile T _metaData;
         private volatile Collection<QpidByteBuffer> _data;
+        private volatile boolean _isHardRef;
+
+        private MessageDataRef(final T metaData, boolean isHardRef)
+        {
+            this(metaData, null, isHardRef);
+        }
 
-        private MessageDataHardRef(final T metaData)
+        private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
         {
             _metaData = metaData;
+            _data = data;
+            _isHardRef = isHardRef;
         }
 
-        @Override
         public T getMetaData()
         {
             return _metaData;
         }
 
-        @Override
         public Collection<QpidByteBuffer> getData()
         {
             return _data;
         }
 
-        @Override
         public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = data;
         }
 
-        @Override
         public boolean isHardRef()
         {
-            return true;
+            return _isHardRef;
+        }
+
+        public void setSoft()
+        {
+            _isHardRef = false;
         }
 
-        @Override
         public void reallocate()
         {
             if(_metaData != null)
@@ -969,37 +985,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
-    }
-
-    private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
-    {
-
-        private T _metaData;
-        private volatile Collection<QpidByteBuffer> _data;
-
-        private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
-        {
-            _metaData = metaData;
-            _data = data;
-        }
-
-        @Override
-        public T getMetaData()
-        {
-            return _metaData;
-        }
-
-        @Override
-        public Collection<QpidByteBuffer> getData()
-        {
-            return _data;
-        }
-
-        @Override
-        public void setData(final Collection<QpidByteBuffer> data)
-        {
-            _data = data;
-        }
 
         public long clear()
         {
@@ -1021,22 +1006,6 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
             return bytesCleared;
         }
-
-        @Override
-        public boolean isHardRef()
-        {
-            return false;
-        }
-
-        @Override
-        public void reallocate()
-        {
-            if(_metaData != null)
-            {
-                _metaData.reallocate();
-            }
-            _data = QpidByteBuffer.reallocateIfNecessary(_data);
-        }
     }
 
     final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1046,23 +1015,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         private final int _contentSize;
         private MessageDataRef<T> _messageDataRef;
 
-        StoredBDBMessage(long messageId, T metaData)
-        {
-            this(messageId, metaData, false);
-        }
-
         StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
         {
             _messageId = messageId;
 
-            if(!isRecovered)
-            {
-                _messageDataRef = new MessageDataHardRef<>(metaData);
-            }
-            else
-            {
-                _messageDataRef = new MessageDataSoftRef<>(metaData, null);
-            }
+            _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
 
             _contentSize = metaData.getContentSize();
             _inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1083,7 +1040,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
                 {
                     checkMessageStoreOpen();
                     metaData = (T) getMessageMetaData(_messageId);
-                    _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+                    _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(), false);
                 }
                 return metaData;
             }
@@ -1200,22 +1157,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData());
                 AbstractBDBMessageStore.this.addContent(txn, _messageId,
                                                         _messageDataRef.getData() == null
                                                                 ? Collections.<QpidByteBuffer>emptySet()
                                                                 : _messageDataRef.getData());
-
-
-                MessageDataRef<T> hardRef = _messageDataRef;
-                MessageDataSoftRef<T> messageDataSoftRef;
-
-                messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
-
-                _messageDataRef = messageDataSoftRef;
-
-
+                _messageDataRef.setSoft();
             }
         }
 
@@ -1247,6 +1194,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         public synchronized void remove()
         {
             checkMessageStoreOpen();
+            _messages.remove(this);
             if(stored())
             {
                 removeMessage(_messageId, false);
@@ -1293,7 +1241,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                final long bytesCleared = _messageDataRef.clear();
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1314,6 +1262,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
                 _messageDataRef.reallocate();
             }
         }
+
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 22020c0..0da7db7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -345,6 +345,10 @@ public class MemoryMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredMemoryMessage storedMemoryMessage : _messages.values())
+        {
+            storedMemoryMessage.clear();
+        }
         _messages.clear();
         _inMemorySize.set(0);
         synchronized (_transactionLock)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index d0fd927..6d6921c 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -154,4 +154,9 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
         }
         _content.addAll(newContent);
     }
+
+    public void clear()
+    {
+        remove();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b63815ce/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 277810d..921f1fb 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -33,7 +33,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -86,6 +88,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private String _tablePrefix = "";
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
+    private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -247,6 +250,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     @Override
     public void closeMessageStore()
     {
+        for (StoredJDBCMessage<?> message : _messages)
+        {
+            message.clear();
+        }
+        _messages.clear();
         _inMemorySize.set(0);
         _bytesEvacuatedFromMemory.set(0);
         if(_executor != null)
@@ -415,8 +423,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         checkMessageStoreOpen();
 
-        return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
+        return createStoredJDBCMessage(getNextMessageId(), metaData, false);
+    }
 
+    public <T extends StorableMessageMetaData> StoredJDBCMessage<T> createStoredJDBCMessage(final long newMessageId,
+                                                                                          final T metaData,
+                                                                                          final boolean recovered)
+    {
+        final StoredJDBCMessage<T> message = new StoredJDBCMessage<>(newMessageId, metaData, recovered);
+        _messages.add(message);
+        return message;
     }
 
     @Override
@@ -1339,50 +1355,49 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
     }
 
-    static interface MessageDataRef<T extends StorableMessageMetaData>
+    private static class MessageDataRef<T extends StorableMessageMetaData>
     {
-        T getMetaData();
-        Collection<QpidByteBuffer> getData();
-        void setData(Collection<QpidByteBuffer> data);
-        boolean isHardRef();
-        void reallocate();
-    }
-
-    private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
-    {
-        private final T _metaData;
+        private volatile T _metaData;
         private volatile Collection<QpidByteBuffer> _data;
+        private volatile boolean _isHardRef;
 
-        private MessageDataHardRef(final T metaData)
+        private MessageDataRef(final T metaData, boolean isHardRef)
+        {
+            this(metaData, null, isHardRef);
+        }
+
+        private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
         {
             _metaData = metaData;
+            _data = data;
+            _isHardRef = isHardRef;
         }
 
-        @Override
         public T getMetaData()
         {
             return _metaData;
         }
 
-        @Override
         public Collection<QpidByteBuffer> getData()
         {
             return _data;
         }
 
-        @Override
         public void setData(final Collection<QpidByteBuffer> data)
         {
             _data = data;
         }
 
-        @Override
         public boolean isHardRef()
         {
-            return true;
+            return _isHardRef;
+        }
+
+        public void setSoft()
+        {
+            _isHardRef = false;
         }
 
-        @Override
         public void reallocate()
         {
             if(_metaData != null)
@@ -1391,37 +1406,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
-    }
-
-    private static final class MessageDataSoftRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
-    {
-
-        private T _metaData;
-        private volatile Collection<QpidByteBuffer> _data;
-
-        private MessageDataSoftRef(final T metaData, Collection<QpidByteBuffer> data)
-        {
-            _metaData = metaData;
-            _data = data;
-        }
-
-        @Override
-        public T getMetaData()
-        {
-            return _metaData;
-        }
-
-        @Override
-        public Collection<QpidByteBuffer> getData()
-        {
-            return _data;
-        }
-
-        @Override
-        public void setData(final Collection<QpidByteBuffer> data)
-        {
-            _data = data;
-        }
 
         public long clear()
         {
@@ -1443,23 +1427,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             return bytesCleared;
         }
-
-        @Override
-        public boolean isHardRef()
-        {
-            return false;
-        }
-
-        @Override
-        public void reallocate()
-        {
-            if(_metaData != null)
-            {
-                _metaData.reallocate();
-            }
-
-            _data = QpidByteBuffer.reallocateIfNecessary(_data);
-        }
     }
 
     private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
@@ -1470,26 +1437,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
         private MessageDataRef<T> _messageDataRef;
 
-
-        StoredJDBCMessage(long messageId, T metaData)
-        {
-            this(messageId, metaData, false);
-        }
-
-
         StoredJDBCMessage(long messageId,
                           T metaData, boolean isRecovered)
         {
             _messageId = messageId;
 
-            if(!isRecovered)
-            {
-                _messageDataRef = new MessageDataHardRef<>(metaData);
-            }
-            else
-            {
-                _messageDataRef = new MessageDataSoftRef<>(metaData, null);
-            }
+            _messageDataRef = new MessageDataRef<>(metaData, !isRecovered);
 
             _contentSize = metaData.getContentSize();
             _inMemorySize.addAndGet(metaData.getStorableSize());
@@ -1513,7 +1466,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                     try
                     {
                         metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
-                        _messageDataRef = new MessageDataSoftRef<>(metaData, _messageDataRef.getData());
+                        _messageDataRef = new MessageDataRef<>(metaData, _messageDataRef.getData(), false);
                     }
                     catch (SQLException e)
                     {
@@ -1635,7 +1588,6 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         {
             if (!stored())
             {
-
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
@@ -1644,14 +1596,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
                 getLogger().debug("Storing message {} to store", _messageId);
 
-                MessageDataRef<T> hardRef = _messageDataRef;
-                MessageDataSoftRef<T> messageDataSoftRef;
-
-                messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
-
-                _messageDataRef = messageDataSoftRef;
-
-
+                _messageDataRef.setSoft();
             }
         }
 
@@ -1683,6 +1628,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             getLogger().debug("REMOVE called on message: {}", _messageId);
 
             checkMessageStoreOpen();
+            _messages.remove(this);
             if(stored())
             {
                 AbstractJDBCMessageStore.this.removeMessage(_messageId);
@@ -1729,7 +1675,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = ((MessageDataSoftRef) _messageDataRef).clear();
+                final long bytesCleared = _messageDataRef.clear();
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1745,6 +1691,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
         }
 
+        public synchronized void clear()
+        {
+            if (_messageDataRef != null)
+            {
+                _messageDataRef.clear();
+            }
+        }
+
         @Override
         public String toString()
         {
@@ -1792,7 +1746,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            message = new StoredJDBCMessage(messageId, metaData, true);
+                            message = createStoredJDBCMessage(messageId, metaData, true);
 
                         }
                         else
@@ -1845,7 +1799,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                             MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(((int)dataAsBytes[0]) &0xff);
                             StorableMessageMetaData metaData = type.createMetaData(buf);
                             buf.dispose();
-                            StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+                            StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
                             if (!handler.handle(message))
                             {
                                 break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org