You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/29 22:59:42 UTC
svn commit: r632455 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: ./
jmx/ region/
Author: chirino
Date: Fri Feb 29 13:59:33 2008
New Revision: 632455
URL: http://svn.apache.org/viewvc?rev=632455&view=rev
Log:
The Producer MemoryLimit can lead to network deadlock when spooling is disabled.
So we now disable using it when sooling is used on a queue.
see:
https://issues.apache.org/activemq/browse/AMQ-1606
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Feb 29 13:59:33 2008
@@ -160,6 +160,7 @@
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver;
private boolean clustered;
+ private Broker regionBroker;
static {
@@ -1430,7 +1431,7 @@
* @throws
*/
protected Broker createBroker() throws Exception {
- Broker regionBroker = createRegionBroker();
+ regionBroker = createRegionBroker();
Broker broker = addInterceptors(regionBroker);
// Add a filter that will stop access to the broker once stopped
@@ -1488,7 +1489,7 @@
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
RegionBroker regionBroker = null;
if (destinationFactory == null) {
- destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
+ destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
}
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
@@ -1796,5 +1797,13 @@
broker.addDestination(adminConnectionContext, destination);
}
}
+ }
+
+ public Broker getRegionBroker() {
+ return regionBroker;
+ }
+
+ public void setRegionBroker(Broker regionBroker) {
+ this.regionBroker = regionBroker;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Feb 29 13:59:33 2008
@@ -126,7 +126,7 @@
}
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
- return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+ return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Fri Feb 29 13:59:33 2008
@@ -19,6 +19,7 @@
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@@ -34,10 +35,10 @@
private final ManagedRegionBroker regionBroker;
- public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
- super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
- this.regionBroker = regionBroker;
+ super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+ this.regionBroker = broker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Feb 29 13:59:33 2008
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.broker.region;
+import java.io.IOException;
+
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo;
@@ -33,8 +36,8 @@
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
- protected final SystemUsage systemUsage;
- protected final MemoryUsage memoryUsage;
+ protected SystemUsage systemUsage;
+ protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private int maxProducersToAudit=1024;
private int maxAuditDepth=2048;
@@ -43,36 +46,41 @@
private boolean useCache=true;
private int minimumMessageSize=1024;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ protected final BrokerService brokerService;
/**
* @param broker
* @param store
* @param destination
- * @param systemUsage
* @param parentStats
+ * @throws Exception
*/
- public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) {
- this.broker=broker;
+ public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
+ this.brokerService = brokerService;
+ this.broker=brokerService.getBroker();
this.store=store;
this.destination=destination;
- this.systemUsage=systemUsage;
- this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
- this.memoryUsage.setUsagePortion(1.0f);
- // Let the store know what usage manager we are using so that he can
- // flush messages to disk when usage gets high.
- if (store != null) {
- store.setMemoryUsage(this.memoryUsage);
- }
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
+
+ this.systemUsage = brokerService.getProducerSystemUsage();
+ this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+ this.memoryUsage.setUsagePortion(1.0f);
}
/**
* initialize the destination
* @throws Exception
*/
- public abstract void initialize() throws Exception;
+ public void initialize() throws Exception {
+ // Let the store know what usage manager we are using so that he can
+ // flush messages to disk when usage gets high.
+ if (store != null) {
+ store.setMemoryUsage(this.memoryUsage);
+ }
+ }
+
/**
* @return the producerFlowControl
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Feb 29 13:59:33 2008
@@ -22,6 +22,7 @@
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
@@ -44,13 +45,13 @@
*/
public class DestinationFactoryImpl extends DestinationFactory {
- protected final SystemUsage memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
+ private final BrokerService brokerService;
- public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
- this.memoryManager = memoryManager;
+ public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+ this.brokerService = brokerService;
this.taskRunnerFactory = taskRunnerFactory;
if (persistenceAdapter == null) {
throw new IllegalArgumentException("null persistenceAdapter");
@@ -76,7 +77,7 @@
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
+ return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume
@@ -90,14 +91,14 @@
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
- Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
+ Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
return queue;
}
} else if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Topic(broker.getRoot(), destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
+ return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
@@ -113,7 +114,7 @@
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
}
- Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
+ Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
topic.initialize();
return topic;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Feb 29 13:59:33 2008
@@ -32,7 +32,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -65,7 +65,6 @@
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -106,9 +105,9 @@
}
};
- public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+ public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
- super(broker, store, destination,systemUsage, parentStats);
+ super(brokerService, store, destination, parentStats);
if (destination.isTemporary() || broker == null || store==null ) {
this.messages = new VMPendingMessageCursor();
@@ -130,8 +129,17 @@
this.dispatchSelector=new QueueDispatchSelector(destination);
}
-
+
public void initialize() throws Exception {
+ // If a VMPendingMessageCursor don't use the default Producer System Usage
+ // since it turns into a shared blocking queue which can lead to a network deadlock.
+ // If we are ccursoring to disk..it's not and issue because it does not block due
+ // to large disk sizes.
+ if( messages instanceof VMPendingMessageCursor ) {
+ this.systemUsage = brokerService.getSystemUsage();
+ memoryUsage.setParent(systemUsage.getMemoryUsage());
+ }
+ super.initialize();
if (store != null) {
// Restore the persistent messages.
messages.setSystemUsage(systemUsage);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Feb 29 13:59:33 2008
@@ -82,7 +82,7 @@
private final Region topicRegion;
private final Region tempQueueRegion;
private final Region tempTopicRegion;
- private BrokerService brokerService;
+ protected BrokerService brokerService;
private boolean started;
private boolean keepDurableSubsActive;
@@ -161,7 +161,7 @@
}
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
- return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+ return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Feb 29 13:59:33 2008
@@ -17,9 +17,8 @@
package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
@@ -34,18 +33,20 @@
*/
public class TempQueueRegion extends AbstractTempRegion {
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
+ private final BrokerService brokerService;
- public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
// Policy
// setAutoCreateDestinations(false);
+ this.brokerService = brokerService;
}
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
+ return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=632455&r1=632454&r2=632455&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Feb 29 13:59:33 2008
@@ -25,6 +25,7 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -87,9 +88,9 @@
};
- public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
+ public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
- super(broker, store, destination,systemUsage, parentStats);
+ super(brokerService, store, destination, parentStats);
this.topicStore=store;
//set default subscription recovery policy
if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
@@ -102,6 +103,7 @@
}
public void initialize() throws Exception{
+ super.initialize();
if (store != null) {
int messageCount = store.getMessageCount();
destinationStatistics.getMessages().setCount(messageCount);