You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/08/20 12:37:34 UTC
svn commit: r567647 [1/3] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/...
Author: rajdavies
Date: Mon Aug 20 03:37:29 2007
New Revision: 567647
URL: http://svn.apache.org/viewvc?rev=567647&view=rev
Log:
UpdateManager changed to account for Store and Temp data usage as well as memory usage
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/UsageCapacity.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/UsageListener.java
- copied, changed from r565381, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageListener.java
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/tree/TreeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Mon Aug 20 03:37:29 2007
@@ -33,7 +33,7 @@
import org.apache.activemq.management.JMSProducerStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.IntrospectionSupport;
/**
@@ -77,7 +77,7 @@
private AtomicLong messageSequence;
private long startTime;
private MessageTransformer transformer;
- private UsageManager producerWindow;
+ private MemoryUsage producerWindow;
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException {
super(session);
@@ -92,7 +92,7 @@
// Enable producer window flow control if protocol > 3 and the window
// size > 0
if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
- producerWindow = new UsageManager("Producer Window: " + producerId);
+ producerWindow = new MemoryUsage("Producer Window: " + producerId);
producerWindow.setLimit(this.info.getWindowSize());
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Aug 20 03:37:29 2007
@@ -79,9 +79,11 @@
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
@@ -1532,7 +1534,7 @@
* @throws JMSException
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- UsageManager producerWindow) throws JMSException {
+ MemoryUsage producerWindow) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Aug 20 03:37:29 2007
@@ -62,7 +62,6 @@
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
@@ -78,6 +77,7 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.JMXSupport;
@@ -118,9 +118,9 @@
private ObjectName brokerObjectName;
private TaskRunnerFactory taskRunnerFactory;
private TaskRunnerFactory persistenceTaskRunnerFactory;
- private UsageManager usageManager;
- private UsageManager producerUsageManager;
- private UsageManager consumerUsageManager;
+ private SystemUsage usageManager;
+ private SystemUsage producerSystemUsage;
+ private SystemUsage consumerSystemUsage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@@ -646,51 +646,61 @@
this.populateJMSXUserID = populateJMSXUserID;
}
- public UsageManager getMemoryManager() {
+ public SystemUsage getUsageManager() {
+ try {
if (usageManager == null) {
- usageManager = new UsageManager("Main");
- usageManager.setLimit(1024 * 1024 * 64); // Default to 64 Meg
- // limit
+ usageManager = new SystemUsage("Main",getPersistenceAdapter(),getTempDataStore());
+ usageManager.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default to 64 Meg
+ usageManager.getTempDiskUsage().setLimit(1024 * 1024 * 1024 * 100);//10 Gb
+ usageManager.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); //100 GB
}
return usageManager;
+ }catch(IOException e) {
+ LOG.fatal("Cannot create SystemUsage",e);
+ throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
+ }
}
- public void setMemoryManager(UsageManager memoryManager) {
+ public void setUsageManager(SystemUsage memoryManager) {
this.usageManager = memoryManager;
}
/**
* @return the consumerUsageManager
+ * @throws IOException
*/
- public UsageManager getConsumerUsageManager() {
- if (consumerUsageManager == null) {
- consumerUsageManager = new UsageManager(getMemoryManager(), "Consumer", 0.5f);
+ public SystemUsage getConsumerSystemUsage() throws IOException {
+ if (consumerSystemUsage == null) {
+ consumerSystemUsage = new SystemUsage(getUsageManager(), "Consumer");
+ consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
}
- return consumerUsageManager;
+ return consumerSystemUsage;
}
/**
* @param consumerUsageManager the consumerUsageManager to set
*/
- public void setConsumerUsageManager(UsageManager consumerUsageManager) {
- this.consumerUsageManager = consumerUsageManager;
+ public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
+ this.consumerSystemUsage = consumerUsageManager;
}
/**
* @return the producerUsageManager
+ * @throws IOException
*/
- public UsageManager getProducerUsageManager() {
- if (producerUsageManager == null) {
- producerUsageManager = new UsageManager(getMemoryManager(), "Producer", 0.45f);
+ public SystemUsage getProducerSystemUsage() throws IOException {
+ if (producerSystemUsage == null) {
+ producerSystemUsage = new SystemUsage(getUsageManager(), "Producer");
+ producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
}
- return producerUsageManager;
+ return producerSystemUsage;
}
/**
* @param producerUsageManager the producerUsageManager to set
*/
- public void setProducerUsageManager(UsageManager producerUsageManager) {
- this.producerUsageManager = producerUsageManager;
+ public void setProducerSystemUsage(SystemUsage producerUsageManager) {
+ this.producerSystemUsage = producerUsageManager;
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
@@ -1377,7 +1387,7 @@
protected Broker createRegionBroker() throws Exception {
// we must start the persistence adaptor before we can create the region
// broker
- getPersistenceAdapter().setUsageManager(getProducerUsageManager());
+ getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
if (this.deleteAllMessagesOnStartup) {
getPersistenceAdapter().deleteAllMessages();
@@ -1392,14 +1402,14 @@
}
RegionBroker regionBroker = null;
if (destinationFactory == null) {
- destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter());
+ destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
}
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
- regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory,
+ regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
destinationInterceptor);
} else {
- regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor);
+ regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
}
destinationFactory.setRegionBroker(regionBroker);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Aug 20 03:37:29 2007
@@ -85,15 +85,15 @@
}
public int getMemoryPercentageUsed() {
- return brokerService.getMemoryManager().getPercentUsage();
+ return brokerService.getUsageManager().getMemoryUsage().getPercentUsage();
}
public long getMemoryLimit() {
- return brokerService.getMemoryManager().getLimit();
+ return brokerService.getUsageManager().getMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
- brokerService.getMemoryManager().setLimit(limit);
+ brokerService.getUsageManager().getMemoryUsage().setLimit(limit);
}
public void resetStatistics() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Aug 20 03:37:29 2007
@@ -94,15 +94,15 @@
}
public int getMemoryPercentageUsed() {
- return destination.getUsageManager().getPercentUsage();
+ return destination.getBrokerMemoryUsage().getPercentUsage();
}
public long getMemoryLimit() {
- return destination.getUsageManager().getLimit();
+ return destination.getBrokerMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
- destination.getUsageManager().setLimit(limit);
+ destination.getBrokerMemoryUsage().setLimit(limit);
}
public double getAverageEnqueueTime() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
public class ManagedQueueRegion extends QueueRegion {
private final ManagedRegionBroker regionBroker;
- public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Mon Aug 20 03:37:29 2007
@@ -61,11 +61,11 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
@@ -92,7 +92,7 @@
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
- public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager,
+ public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
this.mbeanServer = mbeanServer;
@@ -121,19 +121,19 @@
registeredMBeans.clear();
}
- protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
import org.apache.activemq.broker.region.TempQueueRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
- public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
import org.apache.activemq.broker.region.TempTopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
public class ManagedTempTopicRegion extends TempTopicRegion {
private final ManagedRegionBroker regionBroker;
- public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
public class ManagedTopicRegion extends TopicRegion {
private final ManagedRegionBroker regionBroker;
- public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Aug 20 03:37:29 2007
@@ -41,8 +41,8 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationMap;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +56,7 @@
protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
protected final DestinationMap destinationMap = new DestinationMap();
protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
- protected final UsageManager memoryManager;
+ protected final SystemUsage memoryManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
@@ -66,7 +66,7 @@
protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started;
- public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
if (broker == null) {
throw new IllegalArgumentException("null broker");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Aug 20 03:37:29 2007
@@ -25,8 +25,9 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.12 $
@@ -47,7 +48,7 @@
ActiveMQDestination getActiveMQDestination();
- UsageManager getUsageManager();
+ MemoryUsage getBrokerMemoryUsage();
void dispose(ConnectionContext context) throws IOException;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Mon Aug 20 03:37:29 2007
@@ -29,11 +29,11 @@
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
/**
* Creates standard ActiveMQ implementations of
@@ -44,12 +44,12 @@
*/
public class DestinationFactoryImpl extends DestinationFactory {
- protected final UsageManager memoryManager;
+ protected final SystemUsage memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
- public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+ public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.memoryManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (persistenceAdapter == null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Aug 20 03:37:29 2007
@@ -27,8 +27,9 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
/**
*
@@ -79,8 +80,8 @@
return next.getName();
}
- public UsageManager getUsageManager() {
- return next.getUsageManager();
+ public MemoryUsage getBrokerMemoryUsage() {
+ return next.getBrokerMemoryUsage();
}
public boolean lock(MessageReference node, LockOwner lockOwner) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Aug 20 03:37:29 2007
@@ -29,8 +29,9 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,10 +43,10 @@
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
- private final UsageManager usageManager;
+ private final SystemUsage usageManager;
private boolean active;
- public DurableTopicSubscription(Broker broker, UsageManager usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+ public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize()));
this.usageManager = usageManager;
@@ -77,7 +78,7 @@
dispatchMatched();
}
- public synchronized void activate(UsageManager memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
+ public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Activating " + this);
if (!active) {
this.active = true;
@@ -89,7 +90,7 @@
topic.activate(context, this);
}
}
- pending.setUsageManager(memoryManager);
+ pending.setSystemUsage(memoryManager);
pending.start();
// If nothing was in the persistent store, then try to use the
@@ -101,13 +102,13 @@
}
}
dispatchMatched();
- this.usageManager.addUsageListener(this);
+ this.usageManager.getMemoryUsage().addUsageListener(this);
}
}
public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
active = false;
- this.usageManager.removeUsageListener(this);
+ this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
pending.stop();
}
@@ -239,10 +240,10 @@
* @param memoryManager
* @param oldPercentUsage
* @param newPercentUsage
- * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager,
+ * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
* int, int)
*/
- public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try {
dispatchMatched();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Aug 20 03:37:29 2007
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -51,7 +52,6 @@
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
@@ -60,6 +60,8 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -78,7 +80,8 @@
private final ActiveMQDestination destination;
private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final Valve dispatchValve = new Valve(true);
- private final UsageManager usageManager;
+ private final SystemUsage systemUsage;
+ private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
private PendingMessageCursor messages;
private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
@@ -107,12 +110,13 @@
};
};
- public Queue(Broker broker, ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
+ public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.broker = broker;
this.destination = destination;
- this.usageManager = new UsageManager(memoryManager, destination.toString());
- this.usageManager.setUsagePortion(1.0f);
+ this.systemUsage=systemUsage;
+ this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+ this.memoryUsage.setUsagePortion(1.0f);
this.store = store;
if (destination.isTemporary()) {
this.messages = new VMPendingMessageCursor();
@@ -126,7 +130,7 @@
// flush messages to disk
// when usage gets high.
if (store != null) {
- store.setUsageManager(usageManager);
+ store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
@@ -139,7 +143,7 @@
public void initialize() throws Exception {
if (store != null) {
// Restore the persistent messages.
- messages.setUsageManager(getUsageManager());
+ messages.setSystemUsage(systemUsage);
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {
@@ -359,9 +363,9 @@
}
return;
}
- if (context.isProducerFlowControl() && usageManager.isFull()) {
- if (usageManager.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
+ if (context.isProducerFlowControl() && memoryUsage.isFull()) {
+ if (systemUsage.isSendFailIfNoSpace()) {
+ throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
}
// We can avoid blocking due to low usage if the producer is sending
@@ -404,7 +408,7 @@
// If the user manager is not full, then the task will not
// get called..
- if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+ if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
@@ -417,7 +421,7 @@
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
- while (!usageManager.waitForSpace(1000)) {
+ while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
@@ -444,6 +448,7 @@
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
+ systemUsage.getStoreUsage().waitForSpace();
store.addMessage(context, message);
}
if (context.isInTransaction()) {
@@ -552,13 +557,13 @@
synchronized (messages) {
size = messages.size();
}
- return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage() + "%, size=" + size
+ return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size
+ ", in flight groups=" + messageGroupOwners;
}
public void start() throws Exception {
- if (usageManager != null) {
- usageManager.start();
+ if (memoryUsage != null) {
+ memoryUsage.start();
}
messages.start();
doPageIn(false);
@@ -571,8 +576,8 @@
if (messages != null) {
messages.stop();
}
- if (usageManager != null) {
- usageManager.stop();
+ if (memoryUsage != null) {
+ memoryUsage.stop();
}
}
@@ -586,8 +591,8 @@
return destination.getPhysicalName();
}
- public UsageManager getUsageManager() {
- return usageManager;
+ public MemoryUsage getBrokerMemoryUsage() {
+ return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
@@ -926,7 +931,7 @@
*/
public boolean iterate() {
- while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+ while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Mon Aug 20 03:37:29 2007
@@ -24,8 +24,8 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
/**
*
@@ -34,14 +34,14 @@
public class QueueRegion extends AbstractRegion {
public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
- UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public String toString() {
return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
- + ", memory=" + memoryManager.getPercentUsage() + "%";
+ + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Aug 20 03:37:29 2007
@@ -55,11 +55,11 @@
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -99,7 +99,7 @@
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
- public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory,
+ public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
if (destinationFactory == null) {
@@ -158,19 +158,19 @@
return topicRegion;
}
- protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
- protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+ protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -23,15 +23,15 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.7 $
*/
public class TempQueueRegion extends AbstractRegion {
- public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
@@ -65,7 +65,7 @@
}
public String toString() {
- return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+ return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -22,8 +22,8 @@
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +34,7 @@
private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
- public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
@@ -67,7 +67,7 @@
}
public String toString() {
- return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+ return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Aug 20 03:37:29 2007
@@ -43,13 +43,14 @@
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,8 @@
protected final Valve dispatchValve = new Valve(true);
// this could be NULL! (If an advisory)
protected final TopicMessageStore store;
- protected final UsageManager usageManager;
+ private final SystemUsage systemUsage;
+ private final MemoryUsage memoryUsage;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
@@ -85,7 +87,7 @@
// that the UsageManager is holding.
synchronized (messagesWaitingForSpace) {
- while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+ while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
@@ -95,19 +97,20 @@
};
private final Broker broker;
- public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
+ public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
this.broker = broker;
this.destination = destination;
this.store = store; // this could be NULL! (If an advisory)
- this.usageManager = new UsageManager(memoryManager, destination.toString());
- this.usageManager.setUsagePortion(1.0f);
+ this.systemUsage=systemUsage;
+ this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+ this.memoryUsage.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
if (store != null) {
- store.setUsageManager(usageManager);
+ store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
@@ -206,22 +209,16 @@
}
}
// Do we need to create the subscription?
- if (info == null) {
- info = new SubscriptionInfo();
+ if(info==null){
+ info=new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
- info.setDestination(getActiveMQDestination()); // This
- // destination
- // is an actual
- // destination
- // id.
- info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This
- // destination
- // might
- // be a
- // pattern
- store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
+ info.setDestination(getActiveMQDestination());
+ // Thi destination is an actual destination id.
+ info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
+ // This destination might be a pattern
+ store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
}
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
@@ -287,8 +284,8 @@
return;
}
- if (context.isProducerFlowControl() && usageManager.isFull()) {
- if (usageManager.isSendFailIfNoSpace()) {
+ if (context.isProducerFlowControl() && memoryUsage.isFull()) {
+ if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}
@@ -327,7 +324,7 @@
// If the user manager is not full, then the task will not
// get called..
- if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+ if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
@@ -340,7 +337,7 @@
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
- while (!usageManager.waitForSpace(1000)) {
+ while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
@@ -365,6 +362,7 @@
message.setRegionDestination(this);
if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
+ systemUsage.getStoreUsage().waitForSpace();
store.addMessage(context, message);
}
@@ -427,16 +425,16 @@
public void start() throws Exception {
this.subscriptionRecoveryPolicy.start();
- if (usageManager != null) {
- usageManager.start();
+ if (memoryUsage != null) {
+ memoryUsage.start();
}
}
public void stop() throws Exception {
this.subscriptionRecoveryPolicy.stop();
- if (usageManager != null) {
- usageManager.stop();
+ if (memoryUsage != null) {
+ memoryUsage.stop();
}
}
@@ -474,8 +472,8 @@
// Properties
// -------------------------------------------------------------------------
- public UsageManager getUsageManager() {
- return usageManager;
+ public MemoryUsage getBrokerMemoryUsage() {
+ return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Aug 20 03:37:29 2007
@@ -35,9 +35,9 @@
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
@@ -53,7 +53,7 @@
private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
private boolean keepDurableSubsActive;
- public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
@@ -140,7 +140,7 @@
}
public String toString() {
- return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+ return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
@Override
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Aug 20 03:37:29 2007
@@ -36,8 +36,8 @@
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,7 +47,7 @@
private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
protected PendingMessageCursor matched;
- protected final UsageManager usageManager;
+ protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
protected AtomicLong prefetchExtension = new AtomicLong();
@@ -62,7 +62,7 @@
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
- public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws Exception {
+ public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
@@ -71,7 +71,7 @@
}
public void init() throws Exception {
- this.matched.setUsageManager(usageManager);
+ this.matched.setSystemUsage(usageManager);
this.matched.start();
}
@@ -317,7 +317,7 @@
/**
* @return the usageManager
*/
- public UsageManager getUsageManager() {
+ public SystemUsage getUsageManager() {
return this.usageManager;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -20,7 +20,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
/**
* Abstract method holder for pending message (messages awaiting disptach to a
@@ -31,7 +31,7 @@
public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 90;
protected int maxBatchSize = 100;
- protected UsageManager usageManager;
+ protected SystemUsage systemUsage;
public void start() throws Exception {
}
@@ -110,16 +110,16 @@
public void gc() {
}
- public void setUsageManager(UsageManager usageManager) {
- this.usageManager = usageManager;
+ public void setSystemUsage(SystemUsage usageManager) {
+ this.systemUsage = usageManager;
}
public boolean hasSpace() {
- return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark) : true;
+ return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
}
public boolean isFull() {
- return usageManager != null ? usageManager.isFull() : false;
+ return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
}
public void release() {
@@ -146,8 +146,8 @@
/**
* @return the usageManager
*/
- public UsageManager getUsageManager() {
- return this.usageManager;
+ public SystemUsage getSystemUsage() {
+ return this.systemUsage;
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -27,9 +27,10 @@
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,8 +66,8 @@
public void start() {
if (started.compareAndSet(false, true)) {
- if (usageManager != null) {
- usageManager.addUsageListener(this);
+ if (systemUsage != null) {
+ systemUsage.getMemoryUsage().addUsageListener(this);
}
}
}
@@ -74,8 +75,8 @@
public void stop() {
if (started.compareAndSet(true, false)) {
gc();
- if (usageManager != null) {
- usageManager.removeUsageListener(this);
+ if (systemUsage != null) {
+ systemUsage.getMemoryUsage().removeUsageListener(this);
}
}
}
@@ -147,9 +148,10 @@
} else {
flushToDisk();
node.decrementReferenceCount();
+ systemUsage.getTempDiskUsage().waitForSpace();
getDiskList().addLast(node);
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -166,10 +168,11 @@
memoryList.addFirst(node);
} else {
flushToDisk();
+ systemUsage.getTempDiskUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -238,12 +241,12 @@
return !isEmpty();
}
- public void setUsageManager(UsageManager usageManager) {
- super.setUsageManager(usageManager);
- usageManager.addUsageListener(this);
+ public void setSystemUsage(SystemUsage usageManager) {
+ super.setSystemUsage(usageManager);
+ usageManager.getMemoryUsage().addUsageListener(this);
}
- public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
flushRequired = true;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -23,7 +23,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
/**
* Interface to pending message (messages awaiting disptach to a consumer)
@@ -166,15 +166,15 @@
/**
* Set the UsageManager
*
- * @param usageManager
- * @see org.apache.activemq.memory.UsageManager
+ * @param systemUsage
+ * @see org.apache.activemq.usage.SystemUsage
*/
- void setUsageManager(UsageManager usageManager);
+ void setSystemUsage(SystemUsage systemUsage);
/**
* @return the usageManager
*/
- UsageManager getUsageManager();
+ SystemUsage getSystemUsage();
/**
* @return the memoryUsageHighWaterMark
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Aug 20 03:37:29 2007
@@ -29,7 +29,7 @@
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -96,7 +96,7 @@
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
- tsp.setUsageManager(usageManager);
+ tsp.setSystemUsage(systemUsage);
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (started) {
@@ -244,11 +244,11 @@
}
}
- public synchronized void setUsageManager(UsageManager usageManager) {
- super.setUsageManager(usageManager);
+ public synchronized void setSystemUsage(SystemUsage usageManager) {
+ super.setSystemUsage(usageManager);
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) {
PendingMessageCursor tsp = i.next();
- tsp.setUsageManager(usageManager);
+ tsp.setSystemUsage(usageManager);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Mon Aug 20 03:37:29 2007
@@ -20,7 +20,7 @@
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +58,7 @@
if (nonPersistent == null) {
nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
nonPersistent.setMaxBatchSize(getMaxBatchSize());
- nonPersistent.setUsageManager(usageManager);
+ nonPersistent.setSystemUsage(systemUsage);
}
nonPersistent.start();
persistent.start();
@@ -201,13 +201,13 @@
}
}
- public synchronized void setUsageManager(UsageManager usageManager) {
- super.setUsageManager(usageManager);
+ public synchronized void setSystemUsage(SystemUsage usageManager) {
+ super.setSystemUsage(usageManager);
if (persistent != null) {
- persistent.setUsageManager(usageManager);
+ persistent.setSystemUsage(usageManager);
}
if (nonPersistent != null) {
- nonPersistent.setUsageManager(usageManager);
+ nonPersistent.setSystemUsage(usageManager);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Aug 20 03:37:29 2007
@@ -26,7 +26,7 @@
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,7 +61,7 @@
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
- queue.getUsageManager().setLimit(memoryLimit);
+ queue.getBrokerMemoryUsage().setLimit(memoryLimit);
}
if (pendingQueuePolicy != null) {
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
@@ -81,11 +81,11 @@
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if (memoryLimit > 0) {
- topic.getUsageManager().setLimit(memoryLimit);
+ topic.getBrokerMemoryUsage().setLimit(memoryLimit);
}
}
- public void configure(Broker broker, UsageManager memoryManager, TopicSubscription subscription) {
+ public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@@ -111,13 +111,13 @@
}
}
- public void configure(Broker broker, UsageManager memoryManager, DurableTopicSubscription sub) {
+ public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
String clientId = sub.getClientId();
String subName = sub.getSubscriptionName();
int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch);
- cursor.setUsageManager(memoryManager);
+ cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Aug 20 03:37:29 2007
@@ -582,7 +582,7 @@
}
if (rc == 1 && regionDestination != null) {
- regionDestination.getUsageManager().increaseUsage(size);
+ regionDestination.getBrokerMemoryUsage().increaseUsage(size);
}
// System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
@@ -599,7 +599,7 @@
}
if (rc == 0 && regionDestination != null) {
- regionDestination.getUsageManager().decreaseUsage(size);
+ regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
}
// System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
// "+rc);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Mon Aug 20 03:37:29 2007
@@ -256,22 +256,14 @@
*/
void setMaxDataFileLength(long maxDataFileLength);
- /**
- * @see org.apache.activemq.kaha.IndexTypes
- * @return the default index type
- */
- String getIndexTypeAsString();
-
- /**
- * Set the default index type
- *
- * @param type
- * @see org.apache.activemq.kaha.IndexTypes
- */
- void setIndexTypeAsString(String type);
/**
* @return true if the store has been initialized
*/
boolean isInitialized();
+
+ /**
+ * @return the amount of disk space the store is occupying
+ */
+ long size();
}