You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/05/11 14:39:28 UTC

svn commit: r1743383 - in /qpid/java/trunk/client/src/main/java/org/apache/qpid/client: AMQSession.java util/FlowControllingBlockingQueue.java

Author: orudyy
Date: Wed May 11 14:39:28 2016
New Revision: 1743383

URL: http://svn.apache.org/viewvc?rev=1743383&view=rev
Log:
QPID-7237: Coalesce flow control tasks for no-ack sessions and change the lower prefetch threshold to be half of upper prefetch threshold when the same values for thresholds are specified

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1743383&r1=1743382&r2=1743383&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed May 11 14:39:28 2016
@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -161,10 +161,10 @@ public abstract class AMQSession<C exten
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is suspended. */
-    private int _prefetchHighMark;
+    private final int _prefetchHighMark;
 
     /** Holds the low mark for prefetched messages, below which the session is resumed. */
-    private int _prefetchLowMark;
+    private final int _prefetchLowMark;
 
     /** Holds the message listener, if any, which is attached to this session. */
     private MessageListener _messageListener = null;
@@ -364,12 +364,20 @@ public abstract class AMQSession<C exten
         _messageEncryptionHelper = new MessageEncryptionHelper(this);
         _channelId = channelId;
         _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(this);
-        _prefetchHighMark = defaultPrefetchHighMark;
-        _prefetchLowMark = defaultPrefetchLowMark;
 
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
-            _flowControlNoAckTaskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+            _prefetchHighMark = defaultPrefetchHighMark;
+            _prefetchLowMark = defaultPrefetchLowMark == defaultPrefetchHighMark && defaultPrefetchHighMark > 0
+                   ? Math.max(defaultPrefetchHighMark / 2, 1)
+                    : defaultPrefetchLowMark;
+
+            // we coalesce suspend jobs using single threaded pool executor with queue length of one
+            // and discarding policy
+            _flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1,
+                                                               0L, TimeUnit.MILLISECONDS,
+                                                               new LinkedBlockingQueue<Runnable>(1),
+                                                               new ThreadFactory()
             {
                 @Override
                 public Thread newThread(final Runnable r)
@@ -382,7 +390,7 @@ public abstract class AMQSession<C exten
 
                     return thread;
                 }
-            });
+            }, new ThreadPoolExecutor.DiscardPolicy());
 
             final FlowControllingBlockingQueue.ThresholdListener listener =
                     new FlowControllingBlockingQueue.ThresholdListener()
@@ -424,20 +432,15 @@ public abstract class AMQSession<C exten
 
                         private void doSuspend()
                         {
-                            try
-                            {
-                                _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
-                            }
-                            catch (RejectedExecutionException e)
-                            {
-                                // Ignore - session must be closing/closed.
-                            }
+                            _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
                         }
                     };
             _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
         }
         else
         {
+            _prefetchHighMark = defaultPrefetchHighMark;
+            _prefetchLowMark = defaultPrefetchLowMark;
             _flowControlNoAckTaskPool = null;
             _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
         }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1743383&r1=1743382&r2=1743383&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Wed May 11 14:39:28 2016
@@ -88,9 +88,21 @@ public class FlowControllingBlockingQueu
         _flowControlHighThreshold = highThreshold;
         _flowControlLowThreshold = lowThreshold;
         _listener = listener;
-        if (highThreshold == 0)
+        if (highThreshold <= 0)
         {
-        	disableFlowControl = true;
+            disableFlowControl = true;
+        }
+        else if (lowThreshold > highThreshold)
+        {
+            throw new IllegalArgumentException(String.format(
+                    "Invalid low threshold %d : it should be less or equal high threshold %d",
+                    lowThreshold,
+                    highThreshold));
+        }
+        else if (lowThreshold < 1)
+        {
+            throw new IllegalArgumentException(String.format("Invalid low threshold %d: it should be greater than 0",
+                                                             lowThreshold));
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org