You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/05/09 09:20:06 UTC
svn commit: r1742900 -
/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: kwall
Date: Mon May 9 09:20:06 2016
New Revision: 1742900
URL: http://svn.apache.org/viewvc?rev=1742900&view=rev
Log:
QPID-7237: [Java Client] Use single thread thread-pool to perform flow control on no-ack sessions
* Avoids spawning new thread for each state change
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1742900&r1=1742899&r2=1742900&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon May 9 09:20:06 2016
@@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -293,6 +297,8 @@ public abstract class AMQSession<C exten
return _messageFactoryRegistry;
}
+ private final ExecutorService _flowControlNoAckTaskPool;
+
/**
* Consumers associated with this session
*/
@@ -360,73 +366,77 @@ public abstract class AMQSession<C exten
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
- _queue =
- new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- private final AtomicBoolean _suspendState = new AtomicBoolean();
-
- public void aboveThreshold(int currentValue)
- {
- // If the session has been closed don't waste time creating a thread to do
- // flow control
- if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
- // Only execute change if previous state
- // was False
- if (!_suspendState.getAndSet(true))
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "Above threshold(" + _prefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- }
- try
- {
- Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- }
- }
- }
-
- public void underThreshold(int currentValue)
- {
- // If the session has been closed don't waste time creating a thread to do
- // flow control
- if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
- // Only execute change if previous state
- // was true
- if (_suspendState.getAndSet(false))
- {
- if (_logger.isDebugEnabled())
- {
-
- _logger.debug(
- "Below threshold(" + _prefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- }
- try
- {
- Threading.getThreadFactory().createThread(new SuspenderRunner(_suspendState)).start();
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed to create thread", e);
- }
- }
- }
- }
- });
+ _flowControlNoAckTaskPool = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId);
+ if (!thread.isDaemon())
+ {
+ thread.setDaemon(true);
+ }
+
+ return thread;
+ }
+ });
+
+ final FlowControllingBlockingQueue.ThresholdListener listener =
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
+ public void aboveThreshold(int currentValue)
+ {
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
+ {
+ // Only execute change if previous state was false
+ if (!_suspendState.getAndSet(true))
+ {
+ _logger.debug(
+ "Above threshold ({}) so suspending channel. Current value is {}",
+ _prefetchHighMark,
+ currentValue);
+
+ doSuspend();
+ }
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
+ {
+ // Only execute change if previous state was true
+ if (_suspendState.getAndSet(false))
+ {
+ _logger.debug(
+ "Below threshold ({}) so unsuspending channel. Current value is {}",
+ _prefetchLowMark,
+ currentValue);
+ doSuspend();
+ }
+ }
+ }
+
+ private void doSuspend()
+ {
+ try
+ {
+ _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
+ }
+ catch (RejectedExecutionException e)
+ {
+ // Ignore - session must be closing/closed.
+ }
+ }
+ };
+ _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
}
else
{
- _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null);
+ _flowControlNoAckTaskPool = null;
+ _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
}
// Add creation logging to tie in with the existing close logging
@@ -775,6 +785,7 @@ public abstract class AMQSession<C exten
}
finally
{
+ shutdownFlowControlNoAckTaskPool();
_connection.deregisterSession(_channelId);
}
}
@@ -794,7 +805,7 @@ public abstract class AMQSession<C exten
// with a null cause
// When we are closing the Session due to a protocol session error we simply create a new AMQException
// with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
- // We need to determin here if the connection should be
+ // We need to determine here if the connection should be
if (e instanceof AMQDisconnectedException)
{
@@ -822,6 +833,7 @@ public abstract class AMQSession<C exten
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
+ shutdownFlowControlNoAckTaskPool();
}
}
@@ -3191,7 +3203,7 @@ public abstract class AMQSession<C exten
* @throws QpidException If the session cannot be suspended for any reason.
* TODO Be aware of possible changes to parameter order as versions change.
*/
- protected void suspendChannel(boolean suspend) throws QpidException // , FailoverException
+ protected void suspendChannel(boolean suspend) throws QpidException
{
synchronized (_suspensionLock)
{
@@ -3648,7 +3660,7 @@ public abstract class AMQSession<C exten
synchronized (_suspensionLock)
{
// If the session has closed by the time we get here
- // then we should not attempt to write to the sesion/channel.
+ // then we should not attempt to write to the session/channel.
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
suspendChannel(_suspend.get());
@@ -3797,5 +3809,14 @@ public abstract class AMQSession<C exten
{
_queue.clear();
}
+
+ private void shutdownFlowControlNoAckTaskPool()
+ {
+ if (_flowControlNoAckTaskPool != null)
+ {
+ _flowControlNoAckTaskPool.shutdown();
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org