You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/29 22:57:41 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5003
Updated Branches:
refs/heads/trunk 2f9c43f11 -> c7d66e944
https://issues.apache.org/jira/browse/AMQ-5003
Gate the session clear in progress code so that overlapping
transportInterrupted calls don't start consuming lots of memory for no
reason.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c7d66e94
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c7d66e94
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c7d66e94
Branch: refs/heads/trunk
Commit: c7d66e944d94c7452273d8ca998e7c506e385ade
Parents: 2f9c43f
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Jan 29 16:57:33 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Jan 29 16:57:33 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQSession.java | 41 ++++++++++++++------
1 file changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c7d66e94/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 0a96134..47ed980 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -221,6 +221,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean sessionAsyncDispatch;
protected final boolean debug;
protected Object sendMutex = new Object();
+ private final AtomicBoolean clearInProgress = new AtomicBoolean();
private MessageListener messageListener;
private final JMSSessionStatsImpl stats;
@@ -650,21 +651,39 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
executor.clearMessagesInProgress();
- // we are called from inside the transport reconnection logic
- // which involves us clearing all the connections' consumers
- // dispatch and delivered lists. So rather than trying to
- // grab a mutex (which could be already owned by the message
- // listener calling the send or an ack) we allow it to complete in
- // a separate thread via the scheduler and notify us via
- // connection.transportInterruptionProcessingComplete()
+ // we are called from inside the transport reconnection logic which involves us
+ // clearing all the connections' consumers dispatch and delivered lists. So rather
+ // than trying to grab a mutex (which could be already owned by the message listener
+ // calling the send or an ack) we allow it to complete in a separate thread via the
+ // scheduler and notify us via connection.transportInterruptionProcessingComplete()
//
- for (final ActiveMQMessageConsumer consumer : consumers) {
- consumer.inProgressClearRequired();
- transportInterruptionProcessingComplete.incrementAndGet();
+ // We must be careful though not to allow multiple calls to this method from a
+ // connection that is having issue becoming fully established from causing a large
+ // build up of scheduled tasks to clear the same consumers over and over.
+ if (consumers.isEmpty()) {
+ return;
+ }
+
+ if (clearInProgress.compareAndSet(false, true)) {
+ for (final ActiveMQMessageConsumer consumer : consumers) {
+ consumer.inProgressClearRequired();
+ transportInterruptionProcessingComplete.incrementAndGet();
+ try {
+ connection.getScheduler().executeAfterDelay(new Runnable() {
+ @Override
+ public void run() {
+ consumer.clearMessagesInProgress();
+ }}, 0l);
+ } catch (JMSException e) {
+ connection.onClientInternalException(e);
+ }
+ }
+
try {
connection.getScheduler().executeAfterDelay(new Runnable() {
+ @Override
public void run() {
- consumer.clearMessagesInProgress();
+ clearInProgress.set(false);
}}, 0l);
} catch (JMSException e) {
connection.onClientInternalException(e);