You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/29 22:59:42 UTC

svn commit: r632455 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: ./ jmx/ region/

Author: chirino
Date: Fri Feb 29 13:59:33 2008
New Revision: 632455

URL: http://svn.apache.org/viewvc?rev=632455&view=rev
Log:
The Producer MemoryLimit can lead to network deadlock when spooling is disabled.
So we now disable using it when sooling is used on a queue.

see:
https://issues.apache.org/activemq/browse/AMQ-1606


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Feb 29 13:59:33 2008
@@ -160,6 +160,7 @@
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean supportFailOver;
     private boolean clustered;
+    private Broker regionBroker;
    
 
     static {
@@ -1430,7 +1431,7 @@
      * @throws
      */
     protected Broker createBroker() throws Exception {
-        Broker regionBroker = createRegionBroker();
+        regionBroker = createRegionBroker();
         Broker broker = addInterceptors(regionBroker);
 
         // Add a filter that will stop access to the broker once stopped
@@ -1488,7 +1489,7 @@
         DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
         RegionBroker regionBroker = null;
         if (destinationFactory == null) {
-            destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
+            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
         }
         if (isUseJmx()) {
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@@ -1796,5 +1797,13 @@
                 broker.addDestination(adminConnectionContext, destination);
             }
         }
+    }
+
+    public Broker getRegionBroker() {
+        return regionBroker;
+    }
+
+    public void setRegionBroker(Broker regionBroker) {
+        this.regionBroker = regionBroker;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Feb 29 13:59:33 2008
@@ -126,7 +126,7 @@
     }
 
     protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
-        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+        return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
     protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Fri Feb 29 13:59:33 2008
@@ -19,6 +19,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.management.ObjectName;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFactory;
@@ -34,10 +35,10 @@
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                                   DestinationFactory destinationFactory) {
-        super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
-        this.regionBroker = regionBroker;
+        super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+        this.regionBroker = broker;
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Feb 29 13:59:33 2008
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerInfo;
@@ -33,8 +36,8 @@
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
-    protected final SystemUsage systemUsage;
-    protected final MemoryUsage memoryUsage;
+    protected SystemUsage systemUsage;
+    protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=2048;
@@ -43,36 +46,41 @@
     private boolean useCache=true;
     private int minimumMessageSize=1024;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+    protected final BrokerService brokerService;
     
     /**
      * @param broker 
      * @param store 
      * @param destination
-     * @param systemUsage 
      * @param parentStats
+     * @throws Exception 
      */
-    public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) {
-        this.broker=broker;
+    public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
+        this.brokerService = brokerService;
+        this.broker=brokerService.getBroker();
         this.store=store;
         this.destination=destination;
-        this.systemUsage=systemUsage;
-        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
-        this.memoryUsage.setUsagePortion(1.0f);
-        // Let the store know what usage manager we are using so that he can
-        // flush messages to disk when usage gets high.
-        if (store != null) {
-            store.setMemoryUsage(this.memoryUsage);
-        } 
         // let's copy the enabled property from the parent DestinationStatistics
         this.destinationStatistics.setEnabled(parentStats.isEnabled());
         this.destinationStatistics.setParent(parentStats);        
+
+        this.systemUsage = brokerService.getProducerSystemUsage();
+        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+        this.memoryUsage.setUsagePortion(1.0f);
     }
     
     /**
      * initialize the destination
      * @throws Exception
      */
-    public abstract void initialize() throws Exception;
+    public void initialize() throws Exception {
+        // Let the store know what usage manager we are using so that he can
+        // flush messages to disk when usage gets high.
+        if (store != null) {
+            store.setMemoryUsage(this.memoryUsage);
+        } 
+    }
+    
     /**
      * @return the producerFlowControl
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Feb 29 13:59:33 2008
@@ -22,6 +22,7 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -44,13 +45,13 @@
  */
 public class DestinationFactoryImpl extends DestinationFactory {
 
-    protected final SystemUsage memoryManager;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final PersistenceAdapter persistenceAdapter;
     protected RegionBroker broker;
+    private final BrokerService brokerService;
 
-    public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
-        this.memoryManager = memoryManager;
+    public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+        this.brokerService = brokerService;
         this.taskRunnerFactory = taskRunnerFactory;
         if (persistenceAdapter == null) {
             throw new IllegalArgumentException("null persistenceAdapter");
@@ -76,7 +77,7 @@
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-                return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
+                return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
 
                     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
                         // Only consumers on the same connection can consume
@@ -90,14 +91,14 @@
                 };
             } else {
                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
-                Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
+                Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
                 configureQueue(queue, destination);
                 queue.initialize();
                 return queue;
             }
         } else if (destination.isTemporary()) {
             final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-            return new Topic(broker.getRoot(), destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
+            return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
 
                 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
                     // Only consumers on the same connection can consume from
@@ -113,7 +114,7 @@
             if (!AdvisorySupport.isAdvisoryTopic(destination)) {
                 store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
             }
-            Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
+            Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
             configureTopic(topic, destination);
             topic.initialize();
             return topic;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Feb 29 13:59:33 2008
@@ -32,7 +32,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
-import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -65,7 +65,6 @@
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -106,9 +105,9 @@
         }        
     };
                
-    public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
-        super(broker, store, destination,systemUsage, parentStats);
+        super(brokerService, store, destination, parentStats);
         
         if (destination.isTemporary() || broker == null || store==null ) {
             this.messages = new VMPendingMessageCursor();
@@ -130,8 +129,17 @@
         this.dispatchSelector=new QueueDispatchSelector(destination);
        
     }
-
+        
     public void initialize() throws Exception {
+        // If a VMPendingMessageCursor don't use the default Producer System Usage
+        // since it turns into a shared blocking queue which can lead to a network deadlock.  
+        // If we are ccursoring to disk..it's not and issue because it does not block due 
+        // to large disk sizes.
+        if( messages instanceof VMPendingMessageCursor ) {
+            this.systemUsage = brokerService.getSystemUsage();
+            memoryUsage.setParent(systemUsage.getMemoryUsage());
+        }
+        super.initialize();
         if (store != null) {
             // Restore the persistent messages.
             messages.setSystemUsage(systemUsage);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Feb 29 13:59:33 2008
@@ -82,7 +82,7 @@
     private final Region topicRegion;
     private final Region tempQueueRegion;
     private final Region tempTopicRegion;
-    private BrokerService brokerService;
+    protected BrokerService brokerService;
     private boolean started;
     private boolean keepDurableSubsActive;
 
@@ -161,7 +161,7 @@
     }
 
     protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
-        return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+        return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
     protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Feb 29 13:59:33 2008
@@ -17,9 +17,8 @@
 package org.apache.activemq.broker.region;
 
 import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
 
-import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -34,18 +33,20 @@
  */
 public class TempQueueRegion extends AbstractTempRegion {
     private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+    private final BrokerService brokerService;
     
-    public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                            DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         // We should allow the following to be configurable via a Destination
         // Policy
         // setAutoCreateDestinations(false);
+        this.brokerService = brokerService;
     }
 
     protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-        return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
+        return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
 
             public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
                 // Only consumers on the same connection can consume from

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Feb 29 13:59:33 2008
@@ -25,6 +25,7 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -87,9 +88,9 @@
     };
    
 
-    public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
+    public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) throws Exception {
-        super(broker, store, destination,systemUsage, parentStats);
+        super(brokerService, store, destination, parentStats);
         this.topicStore=store;
         //set default subscription recovery policy
         if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
@@ -102,6 +103,7 @@
     }
     
     public void initialize() throws Exception{
+        super.initialize();
         if (store != null) {
             int messageCount = store.getMessageCount();
             destinationStatistics.getMessages().setCount(messageCount);