You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/05/09 09:20:06 UTC

svn commit: r1742900 - /qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: kwall
Date: Mon May  9 09:20:06 2016
New Revision: 1742900

URL: http://svn.apache.org/viewvc?rev=1742900&view=rev
Log:
QPID-7237: [Java Client] Use single thread thread-pool to perform flow control on no-ack sessions

* Avoids spawning new thread for each state change

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1742900&r1=1742899&r2=1742900&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon May  9 09:20:06 2016
@@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -293,6 +297,8 @@ public abstract class AMQSession<C exten
         return _messageFactoryRegistry;
     }
 
+    private final ExecutorService _flowControlNoAckTaskPool;
+
     /**
      * Consumers associated with this session
      */
@@ -360,73 +366,77 @@ public abstract class AMQSession<C exten
 
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
-            _queue =
-                    new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
-                                                     new FlowControllingBlockingQueue.ThresholdListener()
-                                                     {
-                                                         private final AtomicBoolean _suspendState = new AtomicBoolean();
-
-                                                         public void aboveThreshold(int currentValue)
-                                                         {
-                                                             // If the session has been closed don't waste time creating a thread to do
-                                                             // flow control
-                                                             if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
-                                                             {
-                                                                 // Only execute change if previous state
-                                                                 // was False
-                                                                 if (!_suspendState.getAndSet(true))
-                                                                 {
-                                                                     if (_logger.isDebugEnabled())
-                                                                     {
-                                                                         _logger.debug(
-                                                                                 "Above threshold(" + _prefetchHighMark
-                                                                                 + ") so suspending channel. Current value is " + currentValue);
-                                                                     }
-                                                                     try
-                                                                     {
-                                                                         Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
-                                                                     }
-                                                                     catch (Exception e)
-                                                                     {
-                                                                         throw new RuntimeException("Failed to create thread", e);
-                                                                     }
-                                                                 }
-                                                             }
-                                                         }
-
-                                                         public void underThreshold(int currentValue)
-                                                         {
-                                                             // If the session has been closed don't waste time creating a thread to do
-                                                             // flow control
-                                                             if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
-                                                             {
-                                                                 // Only execute change if previous state
-                                                                 // was true
-                                                                 if (_suspendState.getAndSet(false))
-                                                                 {
-                                                                     if (_logger.isDebugEnabled())
-                                                                     {
-
-                                                                         _logger.debug(
-                                                                                 "Below threshold(" + _prefetchLowMark
-                                                                                 + ") so unsuspending channel. Current value is " + currentValue);
-                                                                     }
-                                                                     try
-                                                                     {
-                                                                         Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
-                                                                     }
-                                                                     catch (Exception e)
-                                                                     {
-                                                                         throw new RuntimeException("Failed to create thread", e);
-                                                                     }
-                                                                 }
-                                                             }
-                                                         }
-                                                     });
+            _flowControlNoAckTaskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+            {
+                @Override
+                public Thread newThread(final Runnable r)
+                {
+                    Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId);
+                    if (!thread.isDaemon())
+                    {
+                        thread.setDaemon(true);
+                    }
+
+                    return thread;
+                }
+            });
+
+            final FlowControllingBlockingQueue.ThresholdListener listener =
+                    new FlowControllingBlockingQueue.ThresholdListener()
+                    {
+                        private final AtomicBoolean _suspendState = new AtomicBoolean();
+
+                        public void aboveThreshold(int currentValue)
+                        {
+                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
+                            {
+                                // Only execute change if previous state was false
+                                if (!_suspendState.getAndSet(true))
+                                {
+                                    _logger.debug(
+                                            "Above threshold ({}) so suspending channel. Current value is {}",
+                                            _prefetchHighMark,
+                                            currentValue);
+
+                                    doSuspend();
+                                }
+                            }
+                        }
+
+                        public void underThreshold(int currentValue)
+                        {
+                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
+                            {
+                                // Only execute change if previous state was true
+                                if (_suspendState.getAndSet(false))
+                                {
+                                    _logger.debug(
+                                            "Below threshold ({}) so unsuspending channel. Current value is {}",
+                                            _prefetchLowMark,
+                                            currentValue);
+                                    doSuspend();
+                                }
+                            }
+                        }
+
+                        private void doSuspend()
+                        {
+                            try
+                            {
+                                _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
+                            }
+                            catch (RejectedExecutionException e)
+                            {
+                                // Ignore - session must be closing/closed.
+                            }
+                        }
+                    };
+            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
         }
         else
         {
-            _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null);
+            _flowControlNoAckTaskPool = null;
+            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
         }
 
         // Add creation logging to tie in with the existing close logging
@@ -775,6 +785,7 @@ public abstract class AMQSession<C exten
             }
             finally
             {
+                shutdownFlowControlNoAckTaskPool();
                 _connection.deregisterSession(_channelId);
             }
         }
@@ -794,7 +805,7 @@ public abstract class AMQSession<C exten
         // with a null cause
         // When we are closing the Session due to a protocol session error we simply create a new AMQException
         // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
-        // We need to determin here if the connection should be
+        // We need to determine here if the connection should be
 
         if (e instanceof AMQDisconnectedException)
         {
@@ -822,6 +833,7 @@ public abstract class AMQSession<C exten
 
             _connection.deregisterSession(_channelId);
             closeProducersAndConsumers(amqe);
+            shutdownFlowControlNoAckTaskPool();
         }
 
     }
@@ -3191,7 +3203,7 @@ public abstract class AMQSession<C exten
      * @throws QpidException If the session cannot be suspended for any reason.
      * TODO  Be aware of possible changes to parameter order as versions change.
      */
-    protected void suspendChannel(boolean suspend) throws QpidException // , FailoverException
+    protected void suspendChannel(boolean suspend) throws QpidException
     {
         synchronized (_suspensionLock)
         {
@@ -3648,7 +3660,7 @@ public abstract class AMQSession<C exten
                 synchronized (_suspensionLock)
                 {
                     // If the session has closed by the time we get here
-                    // then we should not attempt to write to the sesion/channel.
+                    // then we should not attempt to write to the session/channel.
                     if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                     {
                         suspendChannel(_suspend.get());
@@ -3797,5 +3809,14 @@ public abstract class AMQSession<C exten
     {
         _queue.clear();
     }
+
+    private void shutdownFlowControlNoAckTaskPool()
+    {
+        if (_flowControlNoAckTaskPool != null)
+        {
+            _flowControlNoAckTaskPool.shutdown();
+        }
+    }
+
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org