You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/07 19:23:55 UTC

activemq git commit: AMQ-5748 - Fixing MessageStore cache

Repository: activemq
Updated Branches:
  refs/heads/master 41ee3ec8d -> de24980a6


AMQ-5748 - Fixing MessageStore cache

This fixes KahaDBStore to properly check for an existing MessageStore
in the cache before creating a new one.  This will prevent potential
issues with metrics.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/de24980a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/de24980a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/de24980a

Branch: refs/heads/master
Commit: de24980a623c864d24cecf9ed852bec38cf09ae3
Parents: 41ee3ec
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Aug 7 17:13:37 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Aug 7 17:23:25 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      | 26 ++++++++++++++++----
 .../store/AbstractMessageStoreSizeStatTest.java | 20 +++++++++++++++
 2 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index bf14d69..84aba07 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1004,16 +1004,32 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
     @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
-        storeCache.put(key(convert(destination)), store);
+        String key = key(convert(destination));
+        MessageStore store = storeCache.get(key(convert(destination)));
+        if (store == null) {
+            final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
+            store = storeCache.putIfAbsent(key, queueStore);
+            if (store == null) {
+                store = queueStore;
+            }
+        }
+
         return store;
     }
 
     @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
-        storeCache.put(key(convert(destination)), store);
-        return store;
+        String key = key(convert(destination));
+        MessageStore store = storeCache.get(key(convert(destination)));
+        if (store == null) {
+            final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+            store = storeCache.putIfAbsent(key, topicStore);
+            if (store == null) {
+                store = topicStore;
+            }
+        }
+
+        return (TopicMessageStore) store;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 59ae44b..1b927b4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -34,10 +35,14 @@ import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicSession;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -247,6 +252,12 @@ public abstract class AbstractMessageStoreSizeStatTest {
         Topic topic = session.createTopic("test.topic");
         session.createDurableSubscriber(topic, "sub1");
 
+        // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
+        //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
+        //then the statistics won't be updated properly because a new store would overwrite the old store
+        //which is still in use
+        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+
         try {
             // publish a bunch of non-persistent messages to fill up the temp
             // store
@@ -256,6 +267,15 @@ public abstract class AbstractMessageStoreSizeStatTest {
                 prod.send(createMessage(session));
             }
 
+            //verify the view has 200 messages
+            assertEquals(1, subs.length);
+            ObjectName subName = subs[0];
+            DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                    broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+            CompositeData[] data  = sub.browse();
+            assertNotNull(data);
+            assertEquals(200, data.length);
+
         } finally {
             connection.stop();
         }