You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/29 17:48:29 UTC

svn commit: r729939 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/container/ main/java/org/apache/activemq/kaha/impl/index/ main/java/org...

Author: rajdavies
Date: Mon Dec 29 08:48:28 2008
New Revision: 729939

URL: http://svn.apache.org/viewvc?rev=729939&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1842

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Dec 29 08:48:28 2008
@@ -28,31 +28,29 @@
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 
-
 /**
  * @version $Revision: 1.12 $
  */
 public abstract class BaseDestination implements Destination {
     /**
-     * The maximum number of messages to page in to the destination
-     * from persistent storage
+     * The maximum number of messages to page in to the destination from persistent storage
      */
-    public static final int MAX_PAGE_SIZE=200;
-    public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
+    public static final int MAX_PAGE_SIZE = 200;
+    public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
     protected final ActiveMQDestination destination;
     protected final Broker broker;
     protected final MessageStore store;
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
-    private int maxProducersToAudit=1024;
-    private int maxAuditDepth=2048;
-    private boolean enableAudit=true;
-    private int maxPageSize=MAX_PAGE_SIZE;
-    private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
-    private boolean useCache=true;
-    private int minimumMessageSize=1024;
-    private boolean lazyDispatch=false;
+    private int maxProducersToAudit = 1024;
+    private int maxAuditDepth = 2048;
+    private boolean enableAudit = true;
+    private int maxPageSize = MAX_PAGE_SIZE;
+    private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
+    private boolean useCache = true;
+    private int minimumMessageSize = 1024;
+    private boolean lazyDispatch = false;
     private boolean advisoryForSlowConsumers;
     private boolean advisdoryForFastProducers;
     private boolean advisoryForDiscardingMessages;
@@ -63,31 +61,32 @@
     protected final BrokerService brokerService;
     protected final Broker regionBroker;
     protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
-    
+
     /**
-     * @param broker 
-     * @param store 
+     * @param broker
+     * @param store
      * @param destination
      * @param parentStats
-     * @throws Exception 
+     * @throws Exception
      */
-    public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
+    public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
+            DestinationStatistics parentStats) throws Exception {
         this.brokerService = brokerService;
-        this.broker=brokerService.getBroker();
-        this.store=store;
-        this.destination=destination;
+        this.broker = brokerService.getBroker();
+        this.store = store;
+        this.destination = destination;
         // let's copy the enabled property from the parent DestinationStatistics
         this.destinationStatistics.setEnabled(parentStats.isEnabled());
-        this.destinationStatistics.setParent(parentStats);        
-
+        this.destinationStatistics.setParent(parentStats);
         this.systemUsage = brokerService.getProducerSystemUsage();
         this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
         this.memoryUsage.setUsagePortion(1.0f);
         this.regionBroker = brokerService.getRegionBroker();
     }
-    
+
     /**
      * initialize the destination
+     * 
      * @throws Exception
      */
     public void initialize() throws Exception {
@@ -95,66 +94,77 @@
         // flush messages to disk when usage gets high.
         if (store != null) {
             store.setMemoryUsage(this.memoryUsage);
-        } 
+        }
     }
-    
+
     /**
      * @return the producerFlowControl
      */
     public boolean isProducerFlowControl() {
         return producerFlowControl;
     }
+
     /**
-     * @param producerFlowControl the producerFlowControl to set
+     * @param producerFlowControl
+     *            the producerFlowControl to set
      */
     public void setProducerFlowControl(boolean producerFlowControl) {
         this.producerFlowControl = producerFlowControl;
     }
+
     /**
      * @return the maxProducersToAudit
      */
     public int getMaxProducersToAudit() {
         return maxProducersToAudit;
     }
+
     /**
-     * @param maxProducersToAudit the maxProducersToAudit to set
+     * @param maxProducersToAudit
+     *            the maxProducersToAudit to set
      */
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         this.maxProducersToAudit = maxProducersToAudit;
     }
+
     /**
      * @return the maxAuditDepth
      */
     public int getMaxAuditDepth() {
         return maxAuditDepth;
     }
+
     /**
-     * @param maxAuditDepth the maxAuditDepth to set
+     * @param maxAuditDepth
+     *            the maxAuditDepth to set
      */
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
+
     /**
      * @return the enableAudit
      */
     public boolean isEnableAudit() {
         return enableAudit;
     }
+
     /**
-     * @param enableAudit the enableAudit to set
+     * @param enableAudit
+     *            the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
     }
-    
-    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationStatistics.getProducers().increment();
     }
 
-    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         destinationStatistics.getProducers().decrement();
     }
-    
+
     public final MemoryUsage getMemoryUsage() {
         return memoryUsage;
     }
@@ -167,20 +177,19 @@
         return destination;
     }
 
-        
     public final String getName() {
         return getActiveMQDestination().getPhysicalName();
     }
-    
+
     public final MessageStore getMessageStore() {
         return store;
     }
-    
+
     public final boolean isActive() {
-        return destinationStatistics.getConsumers().getCount() != 0 ||
-            destinationStatistics.getProducers().getCount() != 0;
+        return destinationStatistics.getConsumers().getCount() != 0
+                || destinationStatistics.getProducers().getCount() != 0;
     }
-    
+
     public int getMaxPageSize() {
         return maxPageSize;
     }
@@ -188,14 +197,14 @@
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
     }
-    
+
     public int getMaxBrowsePageSize() {
         return this.maxBrowsePageSize;
     }
 
     public void setMaxBrowsePageSize(int maxPageSize) {
         this.maxBrowsePageSize = maxPageSize;
-    } 
+    }
 
     public boolean isUseCache() {
         return useCache;
@@ -219,8 +228,8 @@
 
     public void setLazyDispatch(boolean lazyDispatch) {
         this.lazyDispatch = lazyDispatch;
-    } 
-    
+    }
+
     protected long getDestinationSequenceId() {
         return regionBroker.getBrokerSequenceId();
     }
@@ -233,7 +242,8 @@
     }
 
     /**
-     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
+     * @param advisoryForSlowConsumers
+     *            the advisoryForSlowConsumers to set
      */
     public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
         this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@@ -247,10 +257,10 @@
     }
 
     /**
-     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
+     * @param advisoryForDiscardingMessages
+     *            the advisoryForDiscardingMessages to set
      */
-    public void setAdvisoryForDiscardingMessages(
-            boolean advisoryForDiscardingMessages) {
+    public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
         this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
     }
 
@@ -262,7 +272,8 @@
     }
 
     /**
-     * @param advisoryWhenFull the advisoryWhenFull to set
+     * @param advisoryWhenFull
+     *            the advisoryWhenFull to set
      */
     public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
         this.advisoryWhenFull = advisoryWhenFull;
@@ -276,7 +287,8 @@
     }
 
     /**
-     * @param advisoryForDelivery the advisoryForDelivery to set
+     * @param advisoryForDelivery
+     *            the advisoryForDelivery to set
      */
     public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
         this.advisoryForDelivery = advisoryForDelivery;
@@ -290,7 +302,8 @@
     }
 
     /**
-     * @param advisoryForConsumed the advisoryForConsumed to set
+     * @param advisoryForConsumed
+     *            the advisoryForConsumed to set
      */
     public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
         this.advisoryForConsumed = advisoryForConsumed;
@@ -304,12 +317,13 @@
     }
 
     /**
-     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+     * @param advisdoryForFastProducers
+     *            the advisdoryForFastProducers to set
      */
     public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
         this.advisdoryForFastProducers = advisdoryForFastProducers;
     }
-    
+
     /**
      * @return the dead letter strategy
      */
@@ -319,13 +333,16 @@
 
     /**
      * set the dead letter strategy
+     * 
      * @param deadLetterStrategy
      */
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
     }
+
     /**
      * called when message is consumed
+     * 
      * @param context
      * @param messageReference
      */
@@ -334,21 +351,23 @@
             broker.messageConsumed(context, messageReference);
         }
     }
-    
+
     /**
      * Called when message is delivered to the broker
+     * 
      * @param context
      * @param messageReference
      */
     public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
-        if(advisoryForDelivery) {
+        if (advisoryForDelivery) {
             broker.messageDelivered(context, messageReference);
         }
     }
-    
+
     /**
-     * Called when a message is discarded - e.g. running low on memory
-     * This will happen only if the policy is enabled - e.g. non durable topics
+     * Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
+     * e.g. non durable topics
+     * 
      * @param context
      * @param messageReference
      */
@@ -357,43 +376,48 @@
             broker.messageDiscarded(context, messageReference);
         }
     }
-    
+
     /**
      * Called when there is a slow consumer
+     * 
      * @param context
      * @param subs
      */
     public void slowConsumer(ConnectionContext context, Subscription subs) {
-        if(advisoryForSlowConsumers) {
+        if (advisoryForSlowConsumers) {
             broker.slowConsumer(context, this, subs);
         }
     }
-    
+
     /**
      * Called to notify a producer is too fast
+     * 
      * @param context
      * @param producerInfo
      */
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-        if(advisdoryForFastProducers) {
+    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+        if (advisdoryForFastProducers) {
             broker.fastProducer(context, producerInfo);
         }
     }
-    
+
     /**
      * Called when a Usage reaches a limit
+     * 
      * @param context
      * @param usage
      */
-    public void isFull(ConnectionContext context,Usage usage) {
-        if(advisoryWhenFull) {
-            broker.isFull(context,this, usage);
+    public void isFull(ConnectionContext context, Usage usage) {
+        if (advisoryWhenFull) {
+            broker.isFull(context, this, usage);
         }
     }
-    
+
     public void dispose(ConnectionContext context) throws IOException {
-        destinationStatistics.setParent(null);
+        if (this.store != null) {
+            this.store.dispose(context);
+        }
+        this.destinationStatistics.setParent(null);
         this.memoryUsage.stop();
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Dec 29 08:48:28 2008
@@ -475,13 +475,6 @@
         }
     }
 
-    public void dispose(ConnectionContext context) throws IOException {
-        super.dispose(context);
-        if (store != null) {
-            store.removeAllMessages(context);
-        }
-    }
-
 	public void gc(){
 	}
     
@@ -543,6 +536,9 @@
         if (memoryUsage != null) {
             memoryUsage.stop();
         }
+        if (store!=null) {
+            store.stop();
+        }
     }
 
     // Properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Dec 29 08:48:28 2008
@@ -449,13 +449,7 @@
         messageConsumed(context, node);
     }
 
-    public void dispose(ConnectionContext context) throws IOException {
-        super.dispose(context);
-        if (topicStore != null) {
-            topicStore.removeAllMessages(context);
-        }
-    }
-
+    
     public void gc() {
     }
 
@@ -479,6 +473,9 @@
         if (memoryUsage != null) {
             memoryUsage.stop();
         }
+        if(this.topicStore != null) {
+            this.topicStore.stop();
+        }
     }
 
     public Message[] browse() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Mon Dec 29 08:48:28 2008
@@ -283,4 +283,9 @@
      * @return the Index MBean
      */
     IndexMBean getIndexMBean();
+
+    /**
+     * Clean up all state associated with this container.
+     */
+    void delete();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Mon Dec 29 08:48:28 2008
@@ -142,6 +142,16 @@
         }
     }
 
+    public synchronized void delete() {
+        unload();
+        try {
+            index.delete();
+        } catch (IOException e) {
+            LOG.warn("Failed to unload the index", e);
+        }
+    }
+
+
     public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
         checkClosed();
         this.keyMarshaller = keyMarshaller;
@@ -578,7 +588,6 @@
     public IndexMBean getIndexMBean() {
       return (IndexMBean) index;
     }
-  
     public int getIndexMaxBinSize() {
         return indexMaxBinSize;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/Index.java Mon Dec 29 08:48:28 2008
@@ -96,4 +96,11 @@
      * @return
      */
     int getSize();
+
+    /**
+     * delete all state associated with the index
+     *
+     * @throws IOException
+     */
+    void delete() throws IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java Mon Dec 29 08:48:28 2008
@@ -28,7 +28,7 @@
 
 /**
  * Index implementation using a HashMap
- * 
+ *
  * @version $Revision: 1.2 $
  */
 public class VMIndex implements Index, IndexMBean {
@@ -41,7 +41,7 @@
     }
 
     /**
-     * 
+     *
      * @see org.apache.activemq.kaha.impl.index.Index#clear()
      */
     public void clear() {
@@ -122,9 +122,13 @@
         map.clear();
     }
 
+    public void delete() throws IOException {
+        unload();
+    }
+
     public void setKeyMarshaller(Marshaller marshaller) {
     }
-    
+
     public int getSize() {
         return map.size();
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=729939&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Mon Dec 29 08:48:28 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.usage.MemoryUsage;
+
+abstract public class AbstractMessageStore implements MessageStore {
+    protected final ActiveMQDestination destination;
+
+    public AbstractMessageStore(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public void dispose(ConnectionContext context) {
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setMemoryUsage(MemoryUsage memoryUsage) {
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Mon Dec 29 08:48:28 2008
@@ -109,4 +109,5 @@
 
     void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
 
+    void dispose(ConnectionContext context);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Mon Dec 29 08:48:28 2008
@@ -67,6 +67,10 @@
         delegate.stop();
     }
 
+    public void dispose(ConnectionContext context) {
+        delegate.dispose(context);
+    }
+
     public ActiveMQDestination getDestination() {
         return delegate.getDestination();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Dec 29 08:48:28 2008
@@ -126,6 +126,10 @@
 
     }
 
+    public void dispose(ConnectionContext context) {
+        delegate.dispose(context);
+    }
+
     public void resetBatching() {
         delegate.resetBatching();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Dec 29 08:48:28 2008
@@ -30,7 +30,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.DataStructure;
@@ -40,8 +39,8 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStore.ReferenceData;
@@ -59,21 +58,17 @@
  * 
  * @version $Revision: 1.14 $
  */
-public class AMQMessageStore implements MessageStore {
-
+public class AMQMessageStore extends AbstractMessageStore {
     private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
-    
     protected final AMQPersistenceAdapter peristenceAdapter;
     protected final AMQTransactionStore transactionStore;
     protected final ReferenceStore referenceStore;
-    protected final ActiveMQDestination destination;
     protected final TransactionTemplate transactionTemplate;
     protected Location lastLocation;
     protected Location lastWrittenLocation;
     protected Set<Location> inFlightTxLocations = new HashSet<Location>();
     protected final TaskRunner asyncWriteTask;
     protected CountDownLatch flushLatch;
-    
     private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
     private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
     /** A MessageStore that we can use to retrieve messages quickly. */
@@ -82,15 +77,15 @@
     private final AtomicReference<Location> mark = new AtomicReference<Location>();
     protected final Lock lock;
 
-    public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) {
+    public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
+        super(destination);
         this.peristenceAdapter = adapter;
-        this.lock=referenceStore.getStoreLock();
+        this.lock = referenceStore.getStoreLock();
         this.transactionStore = adapter.getTransactionStore();
         this.referenceStore = referenceStore;
-        this.destination = destination;
-        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
+                new NonCachedMessageEvaluationContext()));
         asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
-
             public boolean iterate() {
                 asyncWrite();
                 return false;
@@ -103,8 +98,8 @@
     }
 
     /**
-     * Not synchronize since the Journal has better throughput if you increase
-     * the number of concurrent writes that it is doing.
+     * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
+     * is doing.
      */
     public final void addMessage(ConnectionContext context, final Message message) throws IOException {
         final MessageId id = message.getMessageId();
@@ -122,12 +117,11 @@
             lock.lock();
             try {
                 inFlightTxLocations.add(location);
-            }finally {
+            } finally {
                 lock.unlock();
             }
             transactionStore.addMessage(this, message, location);
             context.getTransaction().addSynchronization(new Synchronization() {
-
                 public void afterCommit() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
@@ -135,7 +129,7 @@
                     lock.lock();
                     try {
                         inFlightTxLocations.remove(location);
-                    }finally {
+                    } finally {
                         lock.unlock();
                     }
                     addMessage(message, location);
@@ -148,7 +142,7 @@
                     lock.lock();
                     try {
                         inFlightTxLocations.remove(location);
-                    }finally {
+                    } finally {
                         lock.unlock();
                     }
                 }
@@ -161,15 +155,14 @@
         data.setExpiration(message.getExpiration());
         data.setFileId(location.getDataFileId());
         data.setOffset(location.getOffset());
-         lock.lock();
-         try {
+        lock.lock();
+        try {
             lastLocation = location;
             messages.put(message.getMessageId(), data);
-        }finally {
+        } finally {
             lock.unlock();
         }
-        if (messages.size() > this.peristenceAdapter
-                .getMaxCheckpointMessageAddSize()) {
+        if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
             flush();
         } else {
             try {
@@ -194,7 +187,8 @@
                 return true;
             }
         } catch (Throwable e) {
-            LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: " + e, e);
+            LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: "
+                    + e, e);
         }
         return false;
     }
@@ -210,7 +204,7 @@
             if (debug) {
                 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
             }
-            removeMessage(ack,location);
+            removeMessage(ack, location);
         } else {
             if (debug) {
                 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@@ -218,33 +212,34 @@
             lock.lock();
             try {
                 inFlightTxLocations.add(location);
-            }finally {
+            } finally {
                 lock.unlock();
             }
             transactionStore.removeMessage(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
-
                 public void afterCommit() throws Exception {
                     if (debug) {
-                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
+                        LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
+                                + location);
                     }
                     lock.lock();
                     try {
                         inFlightTxLocations.remove(location);
-                    }finally {
+                    } finally {
                         lock.unlock();
                     }
-                    removeMessage(ack,location);
+                    removeMessage(ack, location);
                 }
 
                 public void afterRollback() throws Exception {
                     if (debug) {
-                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
+                        LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
+                                + location);
                     }
                     lock.lock();
                     try {
                         inFlightTxLocations.remove(location);
-                    }finally {
+                    } finally {
                         lock.unlock();
                     }
                 }
@@ -255,7 +250,7 @@
     final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
         ReferenceData data;
         lock.lock();
-        try{
+        try {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
             data = messages.remove(id);
@@ -265,13 +260,12 @@
                 // message never got written so datafileReference will still exist
                 AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
             }
-        }finally {
+        } finally {
             lock.unlock();
         }
         if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
             flush();
-        }
-        else if (data == null) {
+        } else if (data == null) {
             try {
                 asyncWriteTask.wakeup();
             } catch (InterruptedException e) {
@@ -279,7 +273,7 @@
             }
         }
     }
-      
+
     public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
         try {
             // Only remove the message if it has not already been removed.
@@ -289,7 +283,8 @@
                 return true;
             }
         } catch (Throwable e) {
-            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
+            LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+                    + "'.  Message may have already been acknowledged. reason: " + e);
         }
         return false;
     }
@@ -313,7 +308,7 @@
                 flushLatch = new CountDownLatch(1);
             }
             countDown = flushLatch;
-        }finally {
+        } finally {
             lock.unlock();
         }
         try {
@@ -338,7 +333,7 @@
             try {
                 countDown = flushLatch;
                 flushLatch = null;
-            }finally {
+            } finally {
                 lock.unlock();
             }
             mark.set(doAsyncWrite());
@@ -368,14 +363,14 @@
             this.messages = new LinkedHashMap<MessageId, ReferenceData>();
             this.messageAcks = new ArrayList<MessageAck>();
             lastLocation = this.lastLocation;
-        }finally {
+        } finally {
             lock.unlock();
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
+            LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
+                    + cpRemovedMessageLocations.size() + " ");
         }
         transactionTemplate.run(new Callback() {
-
             public void execute() throws Exception {
                 int size = 0;
                 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
@@ -386,7 +381,8 @@
                     Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
                         referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
-                        AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,entry.getValue().getFileId());
+                        AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
+                                .getValue().getFileId());
                     } catch (Throwable e) {
                         LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
                     }
@@ -415,7 +411,7 @@
         try {
             cpAddedMessageIds = null;
             lastWrittenLocation = lastLocation;
-        }finally {
+        } finally {
             lock.unlock();
         }
         if (cpActiveJournalLocations.size() > 0) {
@@ -436,14 +432,13 @@
             try {
                 return (Message) rc;
             } catch (ClassCastException e) {
-                throw new IOException("Could not read message " + identity
-                        + " at location " + location
+                throw new IOException("Could not read message " + identity + " at location " + location
                         + ", expected a message, but got: " + rc);
             }
         }
         return null;
     }
-    
+
     protected Location getLocation(MessageId messageId) throws IOException {
         ReferenceData data = null;
         lock.lock();
@@ -453,7 +448,7 @@
             if (data == null && cpAddedMessageIds != null) {
                 data = cpAddedMessageIds.get(messageId);
             }
-        }finally {
+        } finally {
             lock.unlock();
         }
         if (data == null) {
@@ -469,16 +464,15 @@
     }
 
     /**
-     * Replays the referenceStore first as those messages are the oldest ones,
-     * then messages are replayed from the transaction log and then the cache is
-     * updated.
+     * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
+     * transaction log and then the cache is updated.
      * 
      * @param listener
      * @throws Exception
      */
     public void recover(final MessageRecoveryListener listener) throws Exception {
         flush();
-        referenceStore.recover(new RecoveryListenerAdapter(this, listener));         
+        referenceStore.recover(new RecoveryListenerAdapter(this, listener));
     }
 
     public void start() throws Exception {
@@ -506,11 +500,8 @@
         referenceStore.removeAllMessages(context);
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
-    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
+    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
+            String messageRef) throws IOException {
         throw new IOException("The journal does not support message references.");
     }
 
@@ -543,9 +534,10 @@
         location.setOffset(data.getOffset());
         DataStructure rc = peristenceAdapter.readCommand(location);
         try {
-            return (Message)rc;
+            return (Message) rc;
         } catch (ClassCastException e) {
-            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: " + rc);
+            throw new IOException("Could not read message  at location " + location + ", expected a message, but got: "
+                    + rc);
         }
     }
 
@@ -556,4 +548,14 @@
     public Location getMark() {
         return mark.get();
     }
+
+    public void dispose(ConnectionContext context) {
+        try {
+            flush();
+        } catch (InterruptedIOException e) {
+            Thread.currentThread().interrupt();
+        }
+        referenceStore.dispose(context);
+        super.dispose(context);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Mon Dec 29 08:48:28 2008
@@ -27,8 +27,8 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -37,19 +37,18 @@
 /**
  * @version $Revision: 1.10 $
  */
-public class JDBCMessageStore implements MessageStore {
+public class JDBCMessageStore extends AbstractMessageStore {
 
     protected final WireFormat wireFormat;
-    protected final ActiveMQDestination destination;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastMessageId = new AtomicLong(-1);
 
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
+        super(destination);
         this.persistenceAdapter = persistenceAdapter;
         this.adapter = adapter;
         this.wireFormat = wireFormat;
-        this.destination = destination;
     }
 
     public void addMessage(ConnectionContext context, Message message) throws IOException {
@@ -169,12 +168,6 @@
         }
     }
 
-    public void start() {
-    }
-
-    public void stop() {
-    }
-
     /**
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
      */
@@ -191,15 +184,6 @@
         }
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
-    public void setMemoryUsage(MemoryUsage memoryUsage) {
-       //can ignore as messages aren't buffered
-    }
-   
-
     public int getMessageCount() throws IOException {
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Mon Dec 29 08:48:28 2008
@@ -37,6 +37,7 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -50,14 +51,13 @@
  * 
  * @version $Revision: 1.14 $
  */
-public class JournalMessageStore implements MessageStore {
+public class JournalMessageStore extends AbstractMessageStore {
 
     private static final Log LOG = LogFactory.getLog(JournalMessageStore.class);
 
     protected final JournalPersistenceAdapter peristenceAdapter;
     protected final JournalTransactionStore transactionStore;
     protected final MessageStore longTermStore;
-    protected final ActiveMQDestination destination;
     protected final TransactionTemplate transactionTemplate;
     protected RecordLocation lastLocation;
     protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
@@ -72,10 +72,10 @@
     private MemoryUsage memoryUsage;
 
     public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
+        super(destination);
         this.peristenceAdapter = adapter;
         this.transactionStore = adapter.getTransactionStore();
         this.longTermStore = checkpointStore;
-        this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
     }
 
@@ -382,10 +382,6 @@
         longTermStore.removeAllMessages(context);
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
         throw new IOException("The journal does not support message references.");
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Dec 29 08:48:28 2008
@@ -26,6 +26,7 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -35,16 +36,15 @@
  * 
  * @version $Revision: 1.7 $
  */
-public class KahaMessageStore implements MessageStore {
+public class KahaMessageStore extends AbstractMessageStore {
 
-    protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId, Message> messageContainer;
     protected StoreEntry batchEntry;
 
     public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
         throws IOException {
+        super(destination);
         this.messageContainer = container;
-        this.destination = destination;
     }
 
     protected MessageId getMessageId(Object object) {
@@ -101,27 +101,14 @@
         }
     }
 
-    public void start() {
-    }
-
-    public void stop() {
-    }
-
     public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
     public synchronized void delete() {
         messageContainer.clear();
     }
 
-    public void setMemoryUsage(MemoryUsage memoryUsage) {
-    }
-
     /**
      * @return the number of messages held by this destination
      * @see org.apache.activemq.store.MessageStore#getMessageCount()

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Dec 29 08:48:28 2008
@@ -31,15 +31,15 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 
 /**
  * @author rajdavies
  *
  */
-public class KahaReferenceStore implements ReferenceStore {
+public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
 
-    protected final ActiveMQDestination destination;
     protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
     private StoreEntry batchEntry;
@@ -48,19 +48,19 @@
 
     public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
                               ActiveMQDestination destination) throws IOException {
+        super(destination);
         this.adapter = adapter;
         this.messageContainer = container;
-        this.destination = destination;
     }
     
     public Lock getStoreLock() {
         return lock;
     }
 
-    public void start() {
-    }
-
-    public void stop() {
+    public void dispose(ConnectionContext context) {
+        super.dispose(context);
+        this.messageContainer.delete();
+        this.adapter.removeReferenceStore(this);
     }
 
     protected MessageId getMessageId(Object object) {
@@ -204,10 +204,6 @@
         }
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
     public void delete() {
         lock.lock();
         try {
@@ -231,9 +227,6 @@
         return messageContainer.size();
     }
 
-    public void setMemoryUsage(MemoryUsage memoryUsage) {
-    }
-    
     public boolean isSupportForCursors() {
         return true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Dec 29 08:48:28 2008
@@ -33,6 +33,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
@@ -179,6 +180,16 @@
         }
         return rc;
     }
+
+    public void removeReferenceStore(KahaReferenceStore store) {
+        ActiveMQDestination destination = store.getDestination();
+        if (destination.isQueue()) {
+            queues.remove(destination);
+        } else {
+            topics.remove(destination);
+        }
+        messageStores.remove(destination);
+    }
 /*
     public void buildReferenceFileIdsInUse() throws IOException {
         recordReferences = new HashMap<Integer, AtomicInteger>();
@@ -239,7 +250,7 @@
     }
 
     /**
-     * 
+     *
      * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
      */
@@ -250,13 +261,13 @@
     }
 
     /**
-     * 
+     *
      * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
      */
-   
+
     public void recoverState() throws IOException {
-        Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);   
+        Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
         for (SubscriptionInfo info:set) {
             LOG.info("Recovering subscriber state for durable subscriber: " + info);
             TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
@@ -312,7 +323,7 @@
             StoreFactory.delete(stateDirectory);
         }
     }
-    
+
     public boolean isPersistentIndex() {
 		return persistentIndex;
 	}
@@ -363,7 +374,7 @@
     public void setIndexPageSize(int indexPageSize) {
         this.indexPageSize = indexPageSize;
     }
-    
+
     public int getIndexMaxBinSize() {
         return indexMaxBinSize;
     }
@@ -371,7 +382,7 @@
     public void setIndexMaxBinSize(int maxBinSize) {
         this.indexMaxBinSize = maxBinSize;
     }
-    
+
     /**
      * @return the loadFactor
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Dec 29 08:48:28 2008
@@ -39,7 +39,7 @@
 
     protected ListContainer<TopicSubAck> ackContainer;
     protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
-    private Map<String, SubscriptionInfo> subscriberContainer;
+    private MapContainer<String, SubscriptionInfo> subscriberContainer;
     private Store store;
     private static final String TOPIC_SUB_NAME = "tsn";
 
@@ -58,6 +58,11 @@
         }
     }
 
+    public void dispose(ConnectionContext context) {
+        super.dispose(context);
+        subscriberContainer.delete();
+    }
+
     protected MessageId getMessageId(Object object) {
         return new MessageId(((ReferenceRecord)object).getMessageId());
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Dec 29 08:48:28 2008
@@ -30,6 +30,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -39,9 +40,8 @@
  * 
  * @version $Revision: 1.7 $
  */
-public class MemoryMessageStore implements MessageStore {
+public class MemoryMessageStore extends AbstractMessageStore {
 
-    protected final ActiveMQDestination destination;
     protected final Map<MessageId, Message> messageTable;
     protected MessageId lastBatchId;
 
@@ -50,7 +50,7 @@
     }
 
     public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
-        this.destination = destination;
+        super(destination);
         this.messageTable = Collections.synchronizedMap(messageTable);
     }
 
@@ -108,22 +108,12 @@
         }
     }
 
-    public void start() {
-    }
-
-    public void stop() {
-    }
-
     public void removeAllMessages(ConnectionContext context) throws IOException {
         synchronized (messageTable) {
             messageTable.clear();
         }
     }
 
-    public ActiveMQDestination getDestination() {
-        return destination;
-    }
-
     public void delete() {
         synchronized (messageTable) {
             messageTable.clear();
@@ -160,13 +150,4 @@
     public void resetBatching() {
         lastBatchId = null;
     }
-
-    /**
-     * @param memoeyUSage
-     * @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
-     */
-    public void setMemoryUsage(MemoryUsage memoeyUSage){
-        // TODO Auto-generated method stub
-        
-    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java Mon Dec 29 08:48:28 2008
@@ -35,9 +35,9 @@
     private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
     
     protected BrokerService broker;
-    protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
+    protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=0";
     //protected String clientURI="tcp://localhost:61616";
-    protected String bindAddress="tcp://localhost:61616";
+    protected String bindAddress="tcp://localhost:61616?wireFormat.maxInactivityDuration=0";
     //protected String bindAddress = "tcp://localhost:61616";
     //protected String bindAddress="vm://localhost?marshal=true";
     //protected String bindAddress="vm://localhost";
@@ -51,7 +51,7 @@
     protected int numberofProducers = 1;
     protected int totalNumberOfProducers;
     protected int totalNumberOfConsumers;
-    protected int playloadSize = 1024;
+    protected int playloadSize = 12;
     protected byte[] array;
     protected ConnectionFactory factory;
     

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java Mon Dec 29 08:48:28 2008
@@ -42,28 +42,32 @@
    
 
     protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
-        return new SlowConsumer(fac, dest);
+        PerfConsumer result = new SlowConsumer(fac, dest);
+        return result;
     }
 
     protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
         PerfProducer result = super.createProducer(fac, dest, number, payload);
         result.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        result.setSleep(10);
         return result;
     }
 
-    protected BrokerService createBroker() throws Exception {
+    protected BrokerService createBroker(String url) throws Exception {
         Resource resource = new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
+        System.err.println("CREATE BROKER FROM " + resource);
         BrokerFactoryBean factory = new BrokerFactoryBean(resource);
         factory.afterPropertiesSet();
         BrokerService broker = factory.getBroker();
+        
         broker.start();
         return broker;
     }
 
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        ActiveMQConnectionFactory result = super.createConnectionFactory(bindAddress);
+    protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
+        ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
         ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
-        policy.setTopicPrefetch(1000);
+        policy.setTopicPrefetch(10);
         result.setPrefetchPolicy(policy);
         return result;
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java?rev=729939&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java Mon Dec 29 08:48:28 2008
@@ -0,0 +1,235 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * @author Rajani Chennamaneni
+ *
+ */
+public class DispatchMultipleConsumersTest extends TestCase {
+    private final static Log logger = LogFactory.getLog(DispatchMultipleConsumersTest.class);
+    BrokerService broker;
+    Destination dest;
+    String destinationName = "TEST.Q";
+    String msgStr = "Test text message";
+    int messagesPerThread = 20;
+    int producerThreads = 50;
+    int consumerCount = 2;
+    AtomicInteger sentCount;
+    AtomicInteger consumedCount;
+    CountDownLatch producerLatch;
+    CountDownLatch consumerLatch;
+    String brokerURL = "tcp://localhost:61616";
+    String userName = "";
+    String password = "";
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.deleteAllMessages();
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        dest = new ActiveMQQueue(destinationName);
+        resetCounters();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+//      broker.stop();
+        super.tearDown();
+    }
+    
+    private void resetCounters() {
+        sentCount = new AtomicInteger(0);
+        consumedCount = new AtomicInteger(0);
+        producerLatch = new CountDownLatch(producerThreads);
+        consumerLatch = new CountDownLatch(consumerCount);
+    }
+    
+    public void testDispatch1() {
+        for (int i = 1; i <= 5; i++) {
+            resetCounters();
+            dispatch();
+            /*try {
+                System.out.print("Press Enter to continue/finish:");
+                //pause to check the counts on JConsole
+                System.in.read();
+                System.in.read();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }*/
+            //check for consumed messages count
+            assertEquals("Incorrect messages in Iteration " + i, sentCount.get(), consumedCount.get());
+        }
+    }
+    
+    private void dispatch() {
+        startConsumers();
+        startProducers();
+        try {
+            producerLatch.await();
+            consumerLatch.await();
+        } catch (InterruptedException e) {
+            fail("test interrupted!");
+        }
+    }
+
+    private void startConsumers() {
+        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
+        Connection conn;
+        try {
+            conn = connFactory.createConnection();
+            conn.start();
+            for (int i = 0; i < consumerCount; i++) {
+                new ConsumerThread(conn, "ConsumerThread"+i);
+            }
+        } catch (JMSException e) {
+            logger.error("Failed to start consumers", e);
+        }
+    }
+
+    private void startProducers() {
+        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
+        for (int i = 0; i < producerThreads; i++) {
+            new ProducerThread(connFactory, messagesPerThread, "ProducerThread"+i);
+        }
+    }
+
+    private class ConsumerThread extends Thread {
+        Connection conn;
+        Session session;
+        MessageConsumer consumer;
+
+        public ConsumerThread(Connection conn, String name) {
+            super();
+            this.conn = conn;
+            this.setName(name);
+            logger.info("Created new consumer thread:" + name);
+            try {
+                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                consumer = session.createConsumer(dest);
+                start();
+            } catch (JMSException e) {
+                logger.error("Failed to start consumer thread:" + name, e);
+            }
+        }
+
+        @Override
+        public void run() {
+            int msgCount = 0;
+            int nullCount = 0;
+            while (true) {
+                try {
+                    Message msg = consumer.receive(1000);
+                    if (msg == null) {
+                        if (producerLatch.getCount() > 0) {
+                            continue;
+                        }
+                        nullCount++;
+                        if (nullCount > 10) {
+                            //assume that we are not getting any more messages
+                            break;
+                        } else {
+                            continue;
+                        }
+                    } else {
+                        nullCount = 0;
+                    }
+                    Thread.sleep(100);
+                    logger.info("Message received:" + msg.getJMSMessageID());
+                    msgCount++;
+                } catch (JMSException e) {
+                    logger.error("Failed to consume:", e);                  
+                } catch (InterruptedException e) {
+                    logger.error("Interrupted!", e);    
+                }
+            }
+            try {
+                consumer.close();
+            } catch (JMSException e) {
+                logger.error("Failed to close consumer " + getName(), e);   
+            }
+            consumedCount.addAndGet(msgCount);
+            consumerLatch.countDown();
+            logger.info("Consumed " + msgCount + " messages using thread " + getName());
+        }
+        
+    }
+
+    private class ProducerThread extends Thread {
+        int count;
+        Connection conn;
+        Session session;
+        MessageProducer producer;
+                
+        public ProducerThread(ActiveMQConnectionFactory connFactory, int count, String name) {
+            super();
+            this.count = count;
+            this.setName(name);
+            logger.info("Created new producer thread:" + name);
+            try {
+                conn = connFactory.createConnection();
+                conn.start();
+                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producer = session.createProducer(dest);
+                start();
+            } catch (JMSException e) {
+                logger.error("Failed to start producer thread:" + name, e);
+            }
+        }
+
+        @Override
+        public void run() {
+            int i = 0;
+            try {
+                for (; i < count; i++) {
+                    producer.send(session.createTextMessage(msgStr));
+                    Thread.sleep(500);
+                }
+                conn.close();
+            } catch (JMSException e) {
+                logger.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted!", e);    
+            }
+            sentCount.addAndGet(i);
+            producerLatch.countDown();
+            logger.info("Sent " + i + " messages from thread " + getName());
+        }
+    }
+        
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml?rev=729939&r1=729938&r2=729939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml Mon Dec 29 08:48:28 2008
@@ -25,10 +25,10 @@
      <destinationPolicy>
       <policyMap>
         <policyEntries>
-          <policyEntry topic="blob">            
+          <policyEntry topic=">" producerFlowControl="false">            
             <!-- lets force old messages to be discarded for slow consumers -->
             <pendingMessageLimitStrategy>
-              <constantPendingMessageLimitStrategy limit="10"/>
+              <constantPendingMessageLimitStrategy limit="0"/>
             </pendingMessageLimitStrategy>
           </policyEntry>
         </policyEntries>