You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/27 00:51:50 UTC

svn commit: r1476433 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region: DurableTopicSubscription.java PrefetchSubscription.java Topic.java

Author: tabish
Date: Fri Apr 26 22:51:50 2013
New Revision: 1476433

URL: http://svn.apache.org/r1476433
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4351

Ensure the destination statistics are updated on durable sub deactivate. 

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Apr 26 22:51:50 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -191,6 +192,8 @@ public class DurableTopicSubscription ex
         this.usageManager.getMemoryUsage().removeUsageListener(this);
 
         ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
+        List<MessageReference> savedDispateched = null;
+
         synchronized (pendingLock) {
             pending.stop();
 
@@ -224,6 +227,9 @@ public class DurableTopicSubscription ex
                     }
                 }
 
+                if (!topicsToDeactivate.isEmpty()) {
+                    savedDispateched = new ArrayList<MessageReference>(dispatched);
+                }
                 dispatched.clear();
             }
             if (!keepDurableSubsActive && pending.isTransient()) {
@@ -240,7 +246,7 @@ public class DurableTopicSubscription ex
             }
         }
         for(Topic topic: topicsToDeactivate) {
-            topic.deactivate(context, this);
+            topic.deactivate(context, this, savedDispateched);
         }
         prefetchExtension.set(0);
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 26 22:51:50 2013
@@ -582,7 +582,7 @@ public abstract class PrefetchSubscripti
         }
     }
 
-   @Override
+    @Override
     public void add(ConnectionContext context, Destination destination) throws Exception {
         synchronized(pendingLock) {
             super.add(context, destination);
@@ -592,6 +592,10 @@ public abstract class PrefetchSubscripti
 
     @Override
     public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+        return remove(context, destination, dispatched);
+    }
+
+    public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
@@ -600,23 +604,35 @@ public abstract class PrefetchSubscripti
             // Except if each commit or rollback callback action comes before remove of subscriber.
             rc.addAll(pending.remove(context, destination));
 
-            // Synchronized to DispatchLock
-            synchronized(dispatchLock) {
-                ArrayList<MessageReference> references = new ArrayList<MessageReference>();
-                for (MessageReference r : dispatched) {
-                    if( r.getRegionDestination() == destination) {
-                        references.add(r);
-                    }
+            if (dispatched == null) {
+                return rc;
+            }
+
+            // Synchronized to DispatchLock if necessary
+            if (dispatched == this.dispatched) {
+                synchronized(dispatchLock) {
+                    updateDestinationStats(rc, destination, dispatched);
                 }
-                rc.addAll(references);
-                destination.getDestinationStatistics().getDispatched().subtract(references.size());
-                destination.getDestinationStatistics().getInflight().subtract(references.size());
-                dispatched.removeAll(references);
+            } else {
+                updateDestinationStats(rc, destination, dispatched);
             }
         }
         return rc;
     }
 
+    private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
+        ArrayList<MessageReference> references = new ArrayList<MessageReference>();
+        for (MessageReference r : dispatched) {
+            if (r.getRegionDestination() == destination) {
+                references.add(r);
+            }
+        }
+        rc.addAll(references);
+        destination.getDestinationStatistics().getDispatched().subtract(references.size());
+        destination.getDestinationStatistics().getInflight().subtract(references.size());
+        dispatched.removeAll(references);
+    }
+
     protected void dispatchPending() throws IOException {
        synchronized(pendingLock) {
             try {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Apr 26 22:51:50 2013
@@ -73,6 +73,7 @@ public class Topic extends BaseDestinati
     private final TaskRunner taskRunner;
     private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+        @Override
         public void run() {
             try {
                 Topic.this.taskRunner.wakeup();
@@ -106,6 +107,7 @@ public class Topic extends BaseDestinati
         }
     }
 
+    @Override
     public List<Subscription> getConsumers() {
         synchronized (consumers) {
             return new ArrayList<Subscription>(consumers);
@@ -116,6 +118,7 @@ public class Topic extends BaseDestinati
         return true;
     }
 
+    @Override
     public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
 
@@ -182,6 +185,7 @@ public class Topic extends BaseDestinati
         }
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
         if (!sub.getConsumerInfo().isDurable()) {
             super.removeSubscription(context, sub, lastDeliveredSequenceId);
@@ -228,13 +232,13 @@ public class Topic extends BaseDestinati
                     topicStore.deleteSubscription(clientId, subscriptionName);
                     info = null;
                     synchronized (consumers) {
-                    	consumers.remove(subscription);
+                        consumers.remove(subscription);
                     }
                 } else {
                     synchronized (consumers) {
-                    	if (!consumers.contains(subscription)) {
-                    		consumers.add(subscription);
-                    	}
+                        if (!consumers.contains(subscription)) {
+                            consumers.add(subscription);
+                        }
                     }
                 }
             }
@@ -259,6 +263,7 @@ public class Topic extends BaseDestinati
             msgContext.setDestination(destination);
             if (subscription.isRecoveryRequired()) {
                 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
+                    @Override
                     public boolean recoverMessage(Message message) throws Exception {
                         message.setRegionDestination(Topic.this);
                         try {
@@ -272,14 +277,17 @@ public class Topic extends BaseDestinati
                         return true;
                     }
 
+                    @Override
                     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                         throw new RuntimeException("Should not be called.");
                     }
 
+                    @Override
                     public boolean hasSpace() {
                         return true;
                     }
 
+                    @Override
                     public boolean isDuplicate(MessageId id) {
                         return false;
                     }
@@ -290,11 +298,11 @@ public class Topic extends BaseDestinati
         }
     }
 
-    public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
+    public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
         synchronized (consumers) {
             consumers.remove(sub);
         }
-        sub.remove(context, this);
+        sub.remove(context, this, dispatched);
     }
 
     protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
@@ -303,6 +311,7 @@ public class Topic extends BaseDestinati
         }
     }
 
+    @Override
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
 
@@ -348,6 +357,7 @@ public class Topic extends BaseDestinati
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                     synchronized (messagesWaitingForSpace) {
                         messagesWaitingForSpace.add(new Runnable() {
+                            @Override
                             public void run() {
                                 try {
 
@@ -377,7 +387,6 @@ public class Topic extends BaseDestinati
                                         context.getConnection().dispatchAsync(response);
                                     }
                                 }
-
                             }
                         });
 
@@ -521,6 +530,7 @@ public class Topic extends BaseDestinati
         return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
             final MessageReference node) throws IOException {
         if (topicStore != null && node.isPersistent()) {
@@ -532,6 +542,7 @@ public class Topic extends BaseDestinati
         messageConsumed(context, node);
     }
 
+    @Override
     public void gc() {
     }
 
@@ -539,6 +550,7 @@ public class Topic extends BaseDestinati
         return topicStore != null ? topicStore.getMessage(messageId) : null;
     }
 
+    @Override
     public void start() throws Exception {
         this.subscriptionRecoveryPolicy.start();
         if (memoryUsage != null) {
@@ -550,6 +562,7 @@ public class Topic extends BaseDestinati
         }
     }
 
+    @Override
     public void stop() throws Exception {
         if (taskRunner != null) {
             taskRunner.shutdown();
@@ -565,6 +578,7 @@ public class Topic extends BaseDestinati
          scheduler.cancel(expireMessagesTask);
     }
 
+    @Override
     public Message[] browse() {
         final List<Message> result = new ArrayList<Message>();
         doBrowse(result, getMaxBrowsePageSize());
@@ -576,6 +590,7 @@ public class Topic extends BaseDestinati
             if (topicStore != null) {
                 final List<Message> toExpire = new ArrayList<Message>();
                 topicStore.recover(new MessageRecoveryListener() {
+                    @Override
                     public boolean recoverMessage(Message message) throws Exception {
                         if (message.isExpired()) {
                             toExpire.add(message);
@@ -584,14 +599,17 @@ public class Topic extends BaseDestinati
                         return true;
                     }
 
+                    @Override
                     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
                         return true;
                     }
 
+                    @Override
                     public boolean hasSpace() {
                         return browseList.size() < max;
                     }
 
+                    @Override
                     public boolean isDuplicate(MessageId id) {
                         return false;
                     }
@@ -616,6 +634,7 @@ public class Topic extends BaseDestinati
         }
     }
 
+    @Override
     public boolean iterate() {
         synchronized (messagesWaitingForSpace) {
             while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
@@ -661,6 +680,7 @@ public class Topic extends BaseDestinati
     // Implementation methods
     // -------------------------------------------------------------------------
 
+    @Override
     public final void wakeup() {
     }
 
@@ -698,12 +718,14 @@ public class Topic extends BaseDestinati
     }
 
     private final Runnable expireMessagesTask = new Runnable() {
+        @Override
         public void run() {
             List<Message> browsedMessages = new InsertionCountList<Message>();
             doBrowse(browsedMessages, getMaxExpirePageSize());
         }
     };
 
+    @Override
     public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
         broker.messageExpired(context, reference, subs);
         // AMQ-2586: Better to leave this stat at zero than to give the user
@@ -760,6 +782,7 @@ public class Topic extends BaseDestinati
     /**
      * force a reread of the store - after transaction recovery completion
      */
+    @Override
     public void clearPendingMessages() {
         dispatchLock.readLock().lock();
         try {