You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/29 22:57:41 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5003

Updated Branches:
  refs/heads/trunk 2f9c43f11 -> c7d66e944


https://issues.apache.org/jira/browse/AMQ-5003

Gate the session clear in progress code so that overlapping
transportInterrupted calls don't start consuming lots of memory for no
reason.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c7d66e94
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c7d66e94
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c7d66e94

Branch: refs/heads/trunk
Commit: c7d66e944d94c7452273d8ca998e7c506e385ade
Parents: 2f9c43f
Author: Timothy Bish <ta...@gmai.com>
Authored: Wed Jan 29 16:57:33 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Wed Jan 29 16:57:33 2014 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    | 41 ++++++++++++++------
 1 file changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c7d66e94/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 0a96134..47ed980 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -221,6 +221,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
     protected boolean sessionAsyncDispatch;
     protected final boolean debug;
     protected Object sendMutex = new Object();
+    private final AtomicBoolean clearInProgress = new AtomicBoolean();
 
     private MessageListener messageListener;
     private final JMSSessionStatsImpl stats;
@@ -650,21 +651,39 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
 
     void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
         executor.clearMessagesInProgress();
-        // we are called from inside the transport reconnection logic
-        // which involves us clearing all the connections' consumers
-        // dispatch and delivered lists. So rather than trying to
-        // grab a mutex (which could be already owned by the message
-        // listener calling the send or an ack) we allow it to complete in
-        // a separate thread via the scheduler and notify us via
-        // connection.transportInterruptionProcessingComplete()
+        // we are called from inside the transport reconnection logic which involves us
+        // clearing all the connections' consumers dispatch and delivered lists. So rather
+        // than trying to grab a mutex (which could be already owned by the message listener
+        // calling the send or an ack) we allow it to complete in a separate thread via the
+        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
         //
-        for (final ActiveMQMessageConsumer consumer : consumers) {
-            consumer.inProgressClearRequired();
-            transportInterruptionProcessingComplete.incrementAndGet();
+        // We must be careful though not to allow multiple calls to this method from a
+        // connection that is having issue becoming fully established from causing a large
+        // build up of scheduled tasks to clear the same consumers over and over.
+        if (consumers.isEmpty()) {
+            return;
+        }
+
+        if (clearInProgress.compareAndSet(false, true)) {
+            for (final ActiveMQMessageConsumer consumer : consumers) {
+                consumer.inProgressClearRequired();
+                transportInterruptionProcessingComplete.incrementAndGet();
+                try {
+                    connection.getScheduler().executeAfterDelay(new Runnable() {
+                        @Override
+                        public void run() {
+                            consumer.clearMessagesInProgress();
+                        }}, 0l);
+                } catch (JMSException e) {
+                    connection.onClientInternalException(e);
+                }
+            }
+
             try {
                 connection.getScheduler().executeAfterDelay(new Runnable() {
+                    @Override
                     public void run() {
-                        consumer.clearMessagesInProgress();
+                        clearInProgress.set(false);
                     }}, 0l);
             } catch (JMSException e) {
                 connection.onClientInternalException(e);