You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/02/06 23:54:47 UTC

svn commit: r1241246 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Author: tabish
Date: Mon Feb  6 22:54:47 2012
New Revision: 1241246

URL: http://svn.apache.org/viewvc?rev=1241246&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3703

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1241246&r1=1241245&r2=1241246&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Feb  6 22:54:47 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -48,7 +50,6 @@ import org.apache.activemq.store.TopicMe
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
@@ -57,14 +58,12 @@ import org.slf4j.LoggerFactory;
 /**
  * The Topic is a destination that sends a copy of a message to every active
  * Subscription registered.
- * 
- * 
  */
 public class Topic extends BaseDestination implements Task {
     protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
     private final TopicMessageStore topicStore;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
-    private final Valve dispatchValve = new Valve(true);
+    private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@@ -118,21 +117,17 @@ public class Topic extends BaseDestinati
             // Do a retroactive recovery if needed.
             if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
 
-                // synchronize with dispatch method so that no new messages are
-                // sent
-                // while we are recovering a subscription to avoid out of order
-                // messages.
-                dispatchValve.turnOff();
+                // synchronize with dispatch method so that no new messages are sent
+                // while we are recovering a subscription to avoid out of order messages.
+                dispatchLock.writeLock().lock();
                 try {
-
                     synchronized (consumers) {
                         sub.add(context, this);
                         consumers.add(sub);
                     }
                     subscriptionRecoveryPolicy.recover(context, this, sub);
-
                 } finally {
-                    dispatchValve.turnOn();
+                    dispatchLock.writeLock().unlock();
                 }
 
             } else {
@@ -174,9 +169,8 @@ public class Topic extends BaseDestinati
 
     public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
         // synchronize with dispatch method so that no new messages are sent
-        // while
-        // we are recovering a subscription to avoid out of order messages.
-        dispatchValve.turnOff();
+        // while we are recovering a subscription to avoid out of order messages.
+        dispatchLock.writeLock().lock();
         try {
 
             if (topicStore == null) {
@@ -201,6 +195,7 @@ public class Topic extends BaseDestinati
                     }
                 }
             }
+
             // Do we need to create the subscription?
             if (info == null) {
                 info = new SubscriptionInfo();
@@ -248,7 +243,7 @@ public class Topic extends BaseDestinati
                 });
             }
         } finally {
-            dispatchValve.turnOn();
+            dispatchLock.writeLock().unlock();
         }
     }
 
@@ -408,7 +403,7 @@ public class Topic extends BaseDestinati
     /**
      * do send the message - this needs to be synchronized to ensure messages
      * are stored AND dispatched in the right order
-     * 
+     *
      * @param producerExchange
      * @param message
      * @throws IOException
@@ -466,6 +461,7 @@ public class Topic extends BaseDestinati
                 message.decrementReferenceCount();
             }
         }
+
         if (result != null && !result.isCancelled()) {
             try {
                 result.get();
@@ -474,7 +470,6 @@ public class Topic extends BaseDestinati
                 // has already been deleted
             }
         }
-
     }
 
     private boolean canOptimizeOutPersistence() {
@@ -512,7 +507,6 @@ public class Topic extends BaseDestinati
         if (getExpireMessagesPeriod() > 0) {
             scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
         }
-
     }
 
     public void stop() throws Exception {
@@ -634,8 +628,9 @@ public class Topic extends BaseDestinati
         // misleading metrics.
         // destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
-        dispatchValve.increment();
         MessageEvaluationContext msgContext = null;
+
+        dispatchLock.readLock().lock();
         try {
             if (!subscriptionRecoveryPolicy.add(context, message)) {
                 return;
@@ -654,7 +649,7 @@ public class Topic extends BaseDestinati
             }
 
         } finally {
-            dispatchValve.decrement();
+            dispatchLock.readLock().unlock();
             if (msgContext != null) {
                 msgContext.clear();
             }
@@ -693,6 +688,4 @@ public class Topic extends BaseDestinati
     protected Logger getLog() {
         return LOG;
     }
-
-
 }