You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/29 17:48:29 UTC
svn commit: r729939 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/kaha/
main/java/org/apache/activemq/kaha/impl/container/
main/java/org/apache/activemq/kaha/impl/index/ main/java/org...
Author: rajdavies
Date: Mon Dec 29 08:48:28 2008
New Revision: 729939
URL: http://svn.apache.org/viewvc?rev=729939&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1842
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Dec 29 08:48:28 2008
@@ -28,31 +28,29 @@
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
-
/**
* @version $Revision: 1.12 $
*/
public abstract class BaseDestination implements Destination {
/**
- * The maximum number of messages to page in to the destination
- * from persistent storage
+ * The maximum number of messages to page in to the destination from persistent storage
*/
- public static final int MAX_PAGE_SIZE=200;
- public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
+ public static final int MAX_PAGE_SIZE = 200;
+ public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
- private int maxProducersToAudit=1024;
- private int maxAuditDepth=2048;
- private boolean enableAudit=true;
- private int maxPageSize=MAX_PAGE_SIZE;
- private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
- private boolean useCache=true;
- private int minimumMessageSize=1024;
- private boolean lazyDispatch=false;
+ private int maxProducersToAudit = 1024;
+ private int maxAuditDepth = 2048;
+ private boolean enableAudit = true;
+ private int maxPageSize = MAX_PAGE_SIZE;
+ private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
+ private boolean useCache = true;
+ private int minimumMessageSize = 1024;
+ private boolean lazyDispatch = false;
private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers;
private boolean advisoryForDiscardingMessages;
@@ -63,31 +61,32 @@
protected final BrokerService brokerService;
protected final Broker regionBroker;
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
-
+
/**
- * @param broker
- * @param store
+ * @param broker
+ * @param store
* @param destination
* @param parentStats
- * @throws Exception
+ * @throws Exception
*/
- public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
+ public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
+ DestinationStatistics parentStats) throws Exception {
this.brokerService = brokerService;
- this.broker=brokerService.getBroker();
- this.store=store;
- this.destination=destination;
+ this.broker = brokerService.getBroker();
+ this.store = store;
+ this.destination = destination;
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
- this.destinationStatistics.setParent(parentStats);
-
+ this.destinationStatistics.setParent(parentStats);
this.systemUsage = brokerService.getProducerSystemUsage();
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
this.regionBroker = brokerService.getRegionBroker();
}
-
+
/**
* initialize the destination
+ *
* @throws Exception
*/
public void initialize() throws Exception {
@@ -95,66 +94,77 @@
// flush messages to disk when usage gets high.
if (store != null) {
store.setMemoryUsage(this.memoryUsage);
- }
+ }
}
-
+
/**
* @return the producerFlowControl
*/
public boolean isProducerFlowControl() {
return producerFlowControl;
}
+
/**
- * @param producerFlowControl the producerFlowControl to set
+ * @param producerFlowControl
+ * the producerFlowControl to set
*/
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
+
/**
* @return the maxProducersToAudit
*/
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
+
/**
- * @param maxProducersToAudit the maxProducersToAudit to set
+ * @param maxProducersToAudit
+ * the maxProducersToAudit to set
*/
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
+
/**
* @return the maxAuditDepth
*/
public int getMaxAuditDepth() {
return maxAuditDepth;
}
+
/**
- * @param maxAuditDepth the maxAuditDepth to set
+ * @param maxAuditDepth
+ * the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
+
/**
* @return the enableAudit
*/
public boolean isEnableAudit() {
return enableAudit;
}
+
/**
- * @param enableAudit the enableAudit to set
+ * @param enableAudit
+ * the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
-
- public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+
+ public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().increment();
}
- public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+ public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().decrement();
}
-
+
public final MemoryUsage getMemoryUsage() {
return memoryUsage;
}
@@ -167,20 +177,19 @@
return destination;
}
-
public final String getName() {
return getActiveMQDestination().getPhysicalName();
}
-
+
public final MessageStore getMessageStore() {
return store;
}
-
+
public final boolean isActive() {
- return destinationStatistics.getConsumers().getCount() != 0 ||
- destinationStatistics.getProducers().getCount() != 0;
+ return destinationStatistics.getConsumers().getCount() != 0
+ || destinationStatistics.getProducers().getCount() != 0;
}
-
+
public int getMaxPageSize() {
return maxPageSize;
}
@@ -188,14 +197,14 @@
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
-
+
public int getMaxBrowsePageSize() {
return this.maxBrowsePageSize;
}
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
- }
+ }
public boolean isUseCache() {
return useCache;
@@ -219,8 +228,8 @@
public void setLazyDispatch(boolean lazyDispatch) {
this.lazyDispatch = lazyDispatch;
- }
-
+ }
+
protected long getDestinationSequenceId() {
return regionBroker.getBrokerSequenceId();
}
@@ -233,7 +242,8 @@
}
/**
- * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
+ * @param advisoryForSlowConsumers
+ * the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@@ -247,10 +257,10 @@
}
/**
- * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
+ * @param advisoryForDiscardingMessages
+ * the advisoryForDiscardingMessages to set
*/
- public void setAdvisoryForDiscardingMessages(
- boolean advisoryForDiscardingMessages) {
+ public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
}
@@ -262,7 +272,8 @@
}
/**
- * @param advisoryWhenFull the advisoryWhenFull to set
+ * @param advisoryWhenFull
+ * the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
@@ -276,7 +287,8 @@
}
/**
- * @param advisoryForDelivery the advisoryForDelivery to set
+ * @param advisoryForDelivery
+ * the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
@@ -290,7 +302,8 @@
}
/**
- * @param advisoryForConsumed the advisoryForConsumed to set
+ * @param advisoryForConsumed
+ * the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
@@ -304,12 +317,13 @@
}
/**
- * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+ * @param advisdoryForFastProducers
+ * the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
-
+
/**
* @return the dead letter strategy
*/
@@ -319,13 +333,16 @@
/**
* set the dead letter strategy
+ *
* @param deadLetterStrategy
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
+
/**
* called when message is consumed
+ *
* @param context
* @param messageReference
*/
@@ -334,21 +351,23 @@
broker.messageConsumed(context, messageReference);
}
}
-
+
/**
* Called when message is delivered to the broker
+ *
* @param context
* @param messageReference
*/
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
- if(advisoryForDelivery) {
+ if (advisoryForDelivery) {
broker.messageDelivered(context, messageReference);
}
}
-
+
/**
- * Called when a message is discarded - e.g. running low on memory
- * This will happen only if the policy is enabled - e.g. non durable topics
+ * Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
+ * e.g. non durable topics
+ *
* @param context
* @param messageReference
*/
@@ -357,43 +376,48 @@
broker.messageDiscarded(context, messageReference);
}
}
-
+
/**
* Called when there is a slow consumer
+ *
* @param context
* @param subs
*/
public void slowConsumer(ConnectionContext context, Subscription subs) {
- if(advisoryForSlowConsumers) {
+ if (advisoryForSlowConsumers) {
broker.slowConsumer(context, this, subs);
}
}
-
+
/**
* Called to notify a producer is too fast
+ *
* @param context
* @param producerInfo
*/
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
- if(advisdoryForFastProducers) {
+ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+ if (advisdoryForFastProducers) {
broker.fastProducer(context, producerInfo);
}
}
-
+
/**
* Called when a Usage reaches a limit
+ *
* @param context
* @param usage
*/
- public void isFull(ConnectionContext context,Usage usage) {
- if(advisoryWhenFull) {
- broker.isFull(context,this, usage);
+ public void isFull(ConnectionContext context, Usage usage) {
+ if (advisoryWhenFull) {
+ broker.isFull(context, this, usage);
}
}
-
+
public void dispose(ConnectionContext context) throws IOException {
- destinationStatistics.setParent(null);
+ if (this.store != null) {
+ this.store.dispose(context);
+ }
+ this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Dec 29 08:48:28 2008
@@ -475,13 +475,6 @@
}
}
- public void dispose(ConnectionContext context) throws IOException {
- super.dispose(context);
- if (store != null) {
- store.removeAllMessages(context);
- }
- }
-
public void gc(){
}
@@ -543,6 +536,9 @@
if (memoryUsage != null) {
memoryUsage.stop();
}
+ if (store!=null) {
+ store.stop();
+ }
}
// Properties
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Dec 29 08:48:28 2008
@@ -449,13 +449,7 @@
messageConsumed(context, node);
}
- public void dispose(ConnectionContext context) throws IOException {
- super.dispose(context);
- if (topicStore != null) {
- topicStore.removeAllMessages(context);
- }
- }
-
+
public void gc() {
}
@@ -479,6 +473,9 @@
if (memoryUsage != null) {
memoryUsage.stop();
}
+ if(this.topicStore != null) {
+ this.topicStore.stop();
+ }
}
public Message[] browse() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Mon Dec 29 08:48:28 2008
@@ -283,4 +283,9 @@
* @return the Index MBean
*/
IndexMBean getIndexMBean();
+
+ /**
+ * Clean up all state associated with this container.
+ */
+ void delete();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Mon Dec 29 08:48:28 2008
@@ -142,6 +142,16 @@
}
}
+ public synchronized void delete() {
+ unload();
+ try {
+ index.delete();
+ } catch (IOException e) {
+ LOG.warn("Failed to unload the index", e);
+ }
+ }
+
+
public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
checkClosed();
this.keyMarshaller = keyMarshaller;
@@ -578,7 +588,6 @@
public IndexMBean getIndexMBean() {
return (IndexMBean) index;
}
-
public int getIndexMaxBinSize() {
return indexMaxBinSize;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java Mon Dec 29 08:48:28 2008
@@ -96,4 +96,11 @@
* @return
*/
int getSize();
+
+ /**
+ * delete all state associated with the index
+ *
+ * @throws IOException
+ */
+ void delete() throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java Mon Dec 29 08:48:28 2008
@@ -28,7 +28,7 @@
/**
* Index implementation using a HashMap
- *
+ *
* @version $Revision: 1.2 $
*/
public class VMIndex implements Index, IndexMBean {
@@ -41,7 +41,7 @@
}
/**
- *
+ *
* @see org.apache.activemq.kaha.impl.index.Index#clear()
*/
public void clear() {
@@ -122,9 +122,13 @@
map.clear();
}
+ public void delete() throws IOException {
+ unload();
+ }
+
public void setKeyMarshaller(Marshaller marshaller) {
}
-
+
public int getSize() {
return map.size();
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=729939&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Mon Dec 29 08:48:28 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.usage.MemoryUsage;
+
+abstract public class AbstractMessageStore implements MessageStore {
+ protected final ActiveMQDestination destination;
+
+ public AbstractMessageStore(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ public void dispose(ConnectionContext context) {
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setMemoryUsage(MemoryUsage memoryUsage) {
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon Dec 29 08:48:28 2008
@@ -109,4 +109,5 @@
void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
+ void dispose(ConnectionContext context);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon Dec 29 08:48:28 2008
@@ -67,6 +67,10 @@
delegate.stop();
}
+ public void dispose(ConnectionContext context) {
+ delegate.dispose(context);
+ }
+
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Dec 29 08:48:28 2008
@@ -126,6 +126,10 @@
}
+ public void dispose(ConnectionContext context) {
+ delegate.dispose(context);
+ }
+
public void resetBatching() {
delegate.resetBatching();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Dec 29 08:48:28 2008
@@ -30,7 +30,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
@@ -40,8 +39,8 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
@@ -59,21 +58,17 @@
*
* @version $Revision: 1.14 $
*/
-public class AMQMessageStore implements MessageStore {
-
+public class AMQMessageStore extends AbstractMessageStore {
private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
-
protected final AMQPersistenceAdapter peristenceAdapter;
protected final AMQTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
- protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
protected Location lastLocation;
protected Location lastWrittenLocation;
protected Set<Location> inFlightTxLocations = new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
-
private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
@@ -82,15 +77,15 @@
private final AtomicReference<Location> mark = new AtomicReference<Location>();
protected final Lock lock;
- public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) {
+ public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+ super(destination);
this.peristenceAdapter = adapter;
- this.lock=referenceStore.getStoreLock();
+ this.lock = referenceStore.getStoreLock();
this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore;
- this.destination = destination;
- this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
+ this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
+ new NonCachedMessageEvaluationContext()));
asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
-
public boolean iterate() {
asyncWrite();
return false;
@@ -103,8 +98,8 @@
}
/**
- * Not synchronize since the Journal has better throughput if you increase
- * the number of concurrent writes that it is doing.
+ * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
+ * is doing.
*/
public final void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
@@ -122,12 +117,11 @@
lock.lock();
try {
inFlightTxLocations.add(location);
- }finally {
+ } finally {
lock.unlock();
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() {
-
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
@@ -135,7 +129,7 @@
lock.lock();
try {
inFlightTxLocations.remove(location);
- }finally {
+ } finally {
lock.unlock();
}
addMessage(message, location);
@@ -148,7 +142,7 @@
lock.lock();
try {
inFlightTxLocations.remove(location);
- }finally {
+ } finally {
lock.unlock();
}
}
@@ -161,15 +155,14 @@
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
- lock.lock();
- try {
+ lock.lock();
+ try {
lastLocation = location;
messages.put(message.getMessageId(), data);
- }finally {
+ } finally {
lock.unlock();
}
- if (messages.size() > this.peristenceAdapter
- .getMaxCheckpointMessageAddSize()) {
+ if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
flush();
} else {
try {
@@ -194,7 +187,8 @@
return true;
}
} catch (Throwable e) {
- LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e, e);
+ LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: "
+ + e, e);
}
return false;
}
@@ -210,7 +204,7 @@
if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
- removeMessage(ack,location);
+ removeMessage(ack, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@@ -218,33 +212,34 @@
lock.lock();
try {
inFlightTxLocations.add(location);
- }finally {
+ } finally {
lock.unlock();
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
-
public void afterCommit() throws Exception {
if (debug) {
- LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
+ LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
+ + location);
}
lock.lock();
try {
inFlightTxLocations.remove(location);
- }finally {
+ } finally {
lock.unlock();
}
- removeMessage(ack,location);
+ removeMessage(ack, location);
}
public void afterRollback() throws Exception {
if (debug) {
- LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
+ LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
+ + location);
}
lock.lock();
try {
inFlightTxLocations.remove(location);
- }finally {
+ } finally {
lock.unlock();
}
}
@@ -255,7 +250,7 @@
final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
ReferenceData data;
lock.lock();
- try{
+ try {
lastLocation = location;
MessageId id = ack.getLastMessageId();
data = messages.remove(id);
@@ -265,13 +260,12 @@
// message never got written so datafileReference will still exist
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
}
- }finally {
+ } finally {
lock.unlock();
}
if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
flush();
- }
- else if (data == null) {
+ } else if (data == null) {
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
@@ -279,7 +273,7 @@
}
}
}
-
+
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
@@ -289,7 +283,8 @@
return true;
}
} catch (Throwable e) {
- LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
+ LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+ + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
@@ -313,7 +308,7 @@
flushLatch = new CountDownLatch(1);
}
countDown = flushLatch;
- }finally {
+ } finally {
lock.unlock();
}
try {
@@ -338,7 +333,7 @@
try {
countDown = flushLatch;
flushLatch = null;
- }finally {
+ } finally {
lock.unlock();
}
mark.set(doAsyncWrite());
@@ -368,14 +363,14 @@
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>();
lastLocation = this.lastLocation;
- }finally {
+ } finally {
lock.unlock();
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
+ LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
+ + cpRemovedMessageLocations.size() + " ");
}
transactionTemplate.run(new Callback() {
-
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
@@ -386,7 +381,8 @@
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
- AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,entry.getValue().getFileId());
+ AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
+ .getValue().getFileId());
} catch (Throwable e) {
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@@ -415,7 +411,7 @@
try {
cpAddedMessageIds = null;
lastWrittenLocation = lastLocation;
- }finally {
+ } finally {
lock.unlock();
}
if (cpActiveJournalLocations.size() > 0) {
@@ -436,14 +432,13 @@
try {
return (Message) rc;
} catch (ClassCastException e) {
- throw new IOException("Could not read message " + identity
- + " at location " + location
+ throw new IOException("Could not read message " + identity + " at location " + location
+ ", expected a message, but got: " + rc);
}
}
return null;
}
-
+
protected Location getLocation(MessageId messageId) throws IOException {
ReferenceData data = null;
lock.lock();
@@ -453,7 +448,7 @@
if (data == null && cpAddedMessageIds != null) {
data = cpAddedMessageIds.get(messageId);
}
- }finally {
+ } finally {
lock.unlock();
}
if (data == null) {
@@ -469,16 +464,15 @@
}
/**
- * Replays the referenceStore first as those messages are the oldest ones,
- * then messages are replayed from the transaction log and then the cache is
- * updated.
+ * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
+ * transaction log and then the cache is updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
flush();
- referenceStore.recover(new RecoveryListenerAdapter(this, listener));
+ referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
public void start() throws Exception {
@@ -506,11 +500,8 @@
referenceStore.removeAllMessages(context);
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
- public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
+ String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
@@ -543,9 +534,10 @@
location.setOffset(data.getOffset());
DataStructure rc = peristenceAdapter.readCommand(location);
try {
- return (Message)rc;
+ return (Message) rc;
} catch (ClassCastException e) {
- throw new IOException("Could not read message at location " + location + ", expected a message, but got: " + rc);
+ throw new IOException("Could not read message at location " + location + ", expected a message, but got: "
+ + rc);
}
}
@@ -556,4 +548,14 @@
public Location getMark() {
return mark.get();
}
+
+ public void dispose(ConnectionContext context) {
+ try {
+ flush();
+ } catch (InterruptedIOException e) {
+ Thread.currentThread().interrupt();
+ }
+ referenceStore.dispose(context);
+ super.dispose(context);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Dec 29 08:48:28 2008
@@ -27,8 +27,8 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
@@ -37,19 +37,18 @@
/**
* @version $Revision: 1.10 $
*/
-public class JDBCMessageStore implements MessageStore {
+public class JDBCMessageStore extends AbstractMessageStore {
protected final WireFormat wireFormat;
- protected final ActiveMQDestination destination;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
+ super(destination);
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
- this.destination = destination;
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
@@ -169,12 +168,6 @@
}
}
- public void start() {
- }
-
- public void stop() {
- }
-
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
@@ -191,15 +184,6 @@
}
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
- public void setMemoryUsage(MemoryUsage memoryUsage) {
- //can ignore as messages aren't buffered
- }
-
-
public int getMessageCount() throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Mon Dec 29 08:48:28 2008
@@ -37,6 +37,7 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -50,14 +51,13 @@
*
* @version $Revision: 1.14 $
*/
-public class JournalMessageStore implements MessageStore {
+public class JournalMessageStore extends AbstractMessageStore {
private static final Log LOG = LogFactory.getLog(JournalMessageStore.class);
protected final JournalPersistenceAdapter peristenceAdapter;
protected final JournalTransactionStore transactionStore;
protected final MessageStore longTermStore;
- protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
protected RecordLocation lastLocation;
protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
@@ -72,10 +72,10 @@
private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
+ super(destination);
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
- this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
@@ -382,10 +382,6 @@
longTermStore.removeAllMessages(context);
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Dec 29 08:48:28 2008
@@ -26,6 +26,7 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -35,16 +36,15 @@
*
* @version $Revision: 1.7 $
*/
-public class KahaMessageStore implements MessageStore {
+public class KahaMessageStore extends AbstractMessageStore {
- protected final ActiveMQDestination destination;
protected final MapContainer<MessageId, Message> messageContainer;
protected StoreEntry batchEntry;
public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
throws IOException {
+ super(destination);
this.messageContainer = container;
- this.destination = destination;
}
protected MessageId getMessageId(Object object) {
@@ -101,27 +101,14 @@
}
}
- public void start() {
- }
-
- public void stop() {
- }
-
public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
messageContainer.clear();
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
public synchronized void delete() {
messageContainer.clear();
}
- public void setMemoryUsage(MemoryUsage memoryUsage) {
- }
-
/**
* @return the number of messages held by this destination
* @see org.apache.activemq.store.MessageStore#getMessageCount()
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Dec 29 08:48:28 2008
@@ -31,15 +31,15 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
/**
* @author rajdavies
*
*/
-public class KahaReferenceStore implements ReferenceStore {
+public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
- protected final ActiveMQDestination destination;
protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
private StoreEntry batchEntry;
@@ -48,19 +48,19 @@
public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
ActiveMQDestination destination) throws IOException {
+ super(destination);
this.adapter = adapter;
this.messageContainer = container;
- this.destination = destination;
}
public Lock getStoreLock() {
return lock;
}
- public void start() {
- }
-
- public void stop() {
+ public void dispose(ConnectionContext context) {
+ super.dispose(context);
+ this.messageContainer.delete();
+ this.adapter.removeReferenceStore(this);
}
protected MessageId getMessageId(Object object) {
@@ -204,10 +204,6 @@
}
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
public void delete() {
lock.lock();
try {
@@ -231,9 +227,6 @@
return messageContainer.size();
}
- public void setMemoryUsage(MemoryUsage memoryUsage) {
- }
-
public boolean isSupportForCursors() {
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Dec 29 08:48:28 2008
@@ -33,6 +33,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
@@ -179,6 +180,16 @@
}
return rc;
}
+
+ public void removeReferenceStore(KahaReferenceStore store) {
+ ActiveMQDestination destination = store.getDestination();
+ if (destination.isQueue()) {
+ queues.remove(destination);
+ } else {
+ topics.remove(destination);
+ }
+ messageStores.remove(destination);
+ }
/*
public void buildReferenceFileIdsInUse() throws IOException {
recordReferences = new HashMap<Integer, AtomicInteger>();
@@ -239,7 +250,7 @@
}
/**
- *
+ *
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
*/
@@ -250,13 +261,13 @@
}
/**
- *
+ *
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/
-
+
public void recoverState() throws IOException {
- Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
+ Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
for (SubscriptionInfo info:set) {
LOG.info("Recovering subscriber state for durable subscriber: " + info);
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
@@ -312,7 +323,7 @@
StoreFactory.delete(stateDirectory);
}
}
-
+
public boolean isPersistentIndex() {
return persistentIndex;
}
@@ -363,7 +374,7 @@
public void setIndexPageSize(int indexPageSize) {
this.indexPageSize = indexPageSize;
}
-
+
public int getIndexMaxBinSize() {
return indexMaxBinSize;
}
@@ -371,7 +382,7 @@
public void setIndexMaxBinSize(int maxBinSize) {
this.indexMaxBinSize = maxBinSize;
}
-
+
/**
* @return the loadFactor
*/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Dec 29 08:48:28 2008
@@ -39,7 +39,7 @@
protected ListContainer<TopicSubAck> ackContainer;
protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
- private Map<String, SubscriptionInfo> subscriberContainer;
+ private MapContainer<String, SubscriptionInfo> subscriberContainer;
private Store store;
private static final String TOPIC_SUB_NAME = "tsn";
@@ -58,6 +58,11 @@
}
}
+ public void dispose(ConnectionContext context) {
+ super.dispose(context);
+ subscriberContainer.delete();
+ }
+
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).getMessageId());
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Dec 29 08:48:28 2008
@@ -30,6 +30,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -39,9 +40,8 @@
*
* @version $Revision: 1.7 $
*/
-public class MemoryMessageStore implements MessageStore {
+public class MemoryMessageStore extends AbstractMessageStore {
- protected final ActiveMQDestination destination;
protected final Map<MessageId, Message> messageTable;
protected MessageId lastBatchId;
@@ -50,7 +50,7 @@
}
public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
- this.destination = destination;
+ super(destination);
this.messageTable = Collections.synchronizedMap(messageTable);
}
@@ -108,22 +108,12 @@
}
}
- public void start() {
- }
-
- public void stop() {
- }
-
public void removeAllMessages(ConnectionContext context) throws IOException {
synchronized (messageTable) {
messageTable.clear();
}
}
- public ActiveMQDestination getDestination() {
- return destination;
- }
-
public void delete() {
synchronized (messageTable) {
messageTable.clear();
@@ -160,13 +150,4 @@
public void resetBatching() {
lastBatchId = null;
}
-
- /**
- * @param memoeyUSage
- * @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
- */
- public void setMemoryUsage(MemoryUsage memoeyUSage){
- // TODO Auto-generated method stub
-
- }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Mon Dec 29 08:48:28 2008
@@ -35,9 +35,9 @@
private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
protected BrokerService broker;
- protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
+ protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=0";
//protected String clientURI="tcp://localhost:61616";
- protected String bindAddress="tcp://localhost:61616";
+ protected String bindAddress="tcp://localhost:61616?wireFormat.maxInactivityDuration=0";
//protected String bindAddress = "tcp://localhost:61616";
//protected String bindAddress="vm://localhost?marshal=true";
//protected String bindAddress="vm://localhost";
@@ -51,7 +51,7 @@
protected int numberofProducers = 1;
protected int totalNumberOfProducers;
protected int totalNumberOfConsumers;
- protected int playloadSize = 1024;
+ protected int playloadSize = 12;
protected byte[] array;
protected ConnectionFactory factory;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java Mon Dec 29 08:48:28 2008
@@ -42,28 +42,32 @@
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
- return new SlowConsumer(fac, dest);
+ PerfConsumer result = new SlowConsumer(fac, dest);
+ return result;
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer result = super.createProducer(fac, dest, number, payload);
result.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ result.setSleep(10);
return result;
}
- protected BrokerService createBroker() throws Exception {
+ protected BrokerService createBroker(String url) throws Exception {
Resource resource = new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
+ System.err.println("CREATE BROKER FROM " + resource);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService broker = factory.getBroker();
+
broker.start();
return broker;
}
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
- ActiveMQConnectionFactory result = super.createConnectionFactory(bindAddress);
+ protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
+ ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
- policy.setTopicPrefetch(1000);
+ policy.setTopicPrefetch(10);
result.setPrefetchPolicy(policy);
return result;
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java?rev=729939&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java Mon Dec 29 08:48:28 2008
@@ -0,0 +1,235 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * @author Rajani Chennamaneni
+ *
+ */
+public class DispatchMultipleConsumersTest extends TestCase {
+ private final static Log logger = LogFactory.getLog(DispatchMultipleConsumersTest.class);
+ BrokerService broker;
+ Destination dest;
+ String destinationName = "TEST.Q";
+ String msgStr = "Test text message";
+ int messagesPerThread = 20;
+ int producerThreads = 50;
+ int consumerCount = 2;
+ AtomicInteger sentCount;
+ AtomicInteger consumedCount;
+ CountDownLatch producerLatch;
+ CountDownLatch consumerLatch;
+ String brokerURL = "tcp://localhost:61616";
+ String userName = "";
+ String password = "";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.deleteAllMessages();
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ dest = new ActiveMQQueue(destinationName);
+ resetCounters();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+// broker.stop();
+ super.tearDown();
+ }
+
+ private void resetCounters() {
+ sentCount = new AtomicInteger(0);
+ consumedCount = new AtomicInteger(0);
+ producerLatch = new CountDownLatch(producerThreads);
+ consumerLatch = new CountDownLatch(consumerCount);
+ }
+
+ public void testDispatch1() {
+ for (int i = 1; i <= 5; i++) {
+ resetCounters();
+ dispatch();
+ /*try {
+ System.out.print("Press Enter to continue/finish:");
+ //pause to check the counts on JConsole
+ System.in.read();
+ System.in.read();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }*/
+ //check for consumed messages count
+ assertEquals("Incorrect messages in Iteration " + i, sentCount.get(), consumedCount.get());
+ }
+ }
+
+ private void dispatch() {
+ startConsumers();
+ startProducers();
+ try {
+ producerLatch.await();
+ consumerLatch.await();
+ } catch (InterruptedException e) {
+ fail("test interrupted!");
+ }
+ }
+
+ private void startConsumers() {
+ ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
+ Connection conn;
+ try {
+ conn = connFactory.createConnection();
+ conn.start();
+ for (int i = 0; i < consumerCount; i++) {
+ new ConsumerThread(conn, "ConsumerThread"+i);
+ }
+ } catch (JMSException e) {
+ logger.error("Failed to start consumers", e);
+ }
+ }
+
+ private void startProducers() {
+ ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
+ for (int i = 0; i < producerThreads; i++) {
+ new ProducerThread(connFactory, messagesPerThread, "ProducerThread"+i);
+ }
+ }
+
+ private class ConsumerThread extends Thread {
+ Connection conn;
+ Session session;
+ MessageConsumer consumer;
+
+ public ConsumerThread(Connection conn, String name) {
+ super();
+ this.conn = conn;
+ this.setName(name);
+ logger.info("Created new consumer thread:" + name);
+ try {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(dest);
+ start();
+ } catch (JMSException e) {
+ logger.error("Failed to start consumer thread:" + name, e);
+ }
+ }
+
+ @Override
+ public void run() {
+ int msgCount = 0;
+ int nullCount = 0;
+ while (true) {
+ try {
+ Message msg = consumer.receive(1000);
+ if (msg == null) {
+ if (producerLatch.getCount() > 0) {
+ continue;
+ }
+ nullCount++;
+ if (nullCount > 10) {
+ //assume that we are not getting any more messages
+ break;
+ } else {
+ continue;
+ }
+ } else {
+ nullCount = 0;
+ }
+ Thread.sleep(100);
+ logger.info("Message received:" + msg.getJMSMessageID());
+ msgCount++;
+ } catch (JMSException e) {
+ logger.error("Failed to consume:", e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted!", e);
+ }
+ }
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ logger.error("Failed to close consumer " + getName(), e);
+ }
+ consumedCount.addAndGet(msgCount);
+ consumerLatch.countDown();
+ logger.info("Consumed " + msgCount + " messages using thread " + getName());
+ }
+
+ }
+
+ private class ProducerThread extends Thread {
+ int count;
+ Connection conn;
+ Session session;
+ MessageProducer producer;
+
+ public ProducerThread(ActiveMQConnectionFactory connFactory, int count, String name) {
+ super();
+ this.count = count;
+ this.setName(name);
+ logger.info("Created new producer thread:" + name);
+ try {
+ conn = connFactory.createConnection();
+ conn.start();
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(dest);
+ start();
+ } catch (JMSException e) {
+ logger.error("Failed to start producer thread:" + name, e);
+ }
+ }
+
+ @Override
+ public void run() {
+ int i = 0;
+ try {
+ for (; i < count; i++) {
+ producer.send(session.createTextMessage(msgStr));
+ Thread.sleep(500);
+ }
+ conn.close();
+ } catch (JMSException e) {
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted!", e);
+ }
+ sentCount.addAndGet(i);
+ producerLatch.countDown();
+ logger.info("Sent " + i + " messages from thread " + getName());
+ }
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml Mon Dec 29 08:48:28 2008
@@ -25,10 +25,10 @@
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry topic="blob">
+ <policyEntry topic=">" producerFlowControl="false">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="10"/>
+ <constantPendingMessageLimitStrategy limit="0"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>