You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/22 16:22:46 UTC

[4/7] qpid-broker-j git commit: QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.

QPID-7784: [Java Broker] Dispose QpidByteBuffers associated with pooled threads when shutting down executors.

Cherry picked from d9af2660089139e2f4fdad8c0aa0e0c8e6529ff5


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c42f1589
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c42f1589
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c42f1589

Branch: refs/heads/6.1.x
Commit: c42f1589a5da52c549d5c52bb7b224ba5d9a6f4e
Parents: 6d6f801
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon May 22 15:29:19 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon May 22 16:17:32 2017 +0100

----------------------------------------------------------------------
 .../transport/NetworkConnectionScheduler.java   | 34 +++++++++++++++++++-
 .../qpid/server/util/HousekeepingExecutor.java  | 25 ++++++++++++++
 .../apache/qpid/bytebuffer/QpidByteBuffer.java  |  9 ++++++
 3 files changed, 67 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c42f1589/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index b0e40bb..3c9ab46 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.transport;
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -32,8 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.TransportException;
 
+
 public class NetworkConnectionScheduler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
@@ -101,7 +105,35 @@ public class NetworkConnectionScheduler
             _selectorThread = new SelectorThread(this, _numberOfSelectors);
             _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
                                                _threadKeepAliveTimeout, TimeUnit.MINUTES,
-                                               new LinkedBlockingQueue<Runnable>(), _factory);
+                                               new LinkedBlockingQueue<Runnable>(), _factory)
+            {
+                private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+                @Override
+                protected void afterExecute(final Runnable r, final Throwable t)
+                {
+                    super.afterExecute(r, t);
+                    final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+                    if (cachedThreadLocalBuffer != null)
+                    {
+                        _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+                    }
+                    else
+                    {
+                        _cachedBufferMap.remove(Thread.currentThread());
+                    }
+                }
+
+                @Override
+                protected void terminated()
+                {
+                    super.terminated();
+                    for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+                    {
+                        qpidByteBuffer.dispose();
+                    }
+                }
+            };
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
             for(int i = 0 ; i < _poolSize; i++)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c42f1589/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
index 9821f38..3742833 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/HousekeepingExecutor.java
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.server.util;
 
+import java.util.Map;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,12 +33,14 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
 
 public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
 {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HousekeepingExecutor.class);
+    private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
 
     public HousekeepingExecutor(final String threadPrefix, final int threadCount, final Subject subject)
     {
@@ -53,6 +57,17 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
     protected void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r, t);
+
+        final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+        if (cachedThreadLocalBuffer != null)
+        {
+            _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+        }
+        else
+        {
+            _cachedBufferMap.remove(Thread.currentThread());
+        }
+
         if (t == null && r instanceof Future<?>)
         {
             Future future = (Future<?>) r;
@@ -96,4 +111,14 @@ public class HousekeepingExecutor extends ScheduledThreadPoolExecutor
             }
         }
     }
+
+    @Override
+    protected void terminated()
+    {
+        super.terminated();
+        for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+        {
+            qpidByteBuffer.dispose();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c42f1589/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 8b6005f..46aea6b 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -819,6 +819,15 @@ public class QpidByteBuffer
         _isPoolInitialized = true;
     }
 
+    /**
+     * Not for general use!
+     * Used to clear threadlocal buffer when shutting down thread pools.
+     */
+    public static QpidByteBuffer getCachedThreadLocalBuffer()
+    {
+        return _cachedBuffer.get();
+    }
+
     public static int getPooledBufferSize()
     {
         return _pooledBufferSize;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org