You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/07 19:23:55 UTC
activemq git commit: AMQ-5748 - Fixing MessageStore cache
Repository: activemq
Updated Branches:
refs/heads/master 41ee3ec8d -> de24980a6
AMQ-5748 - Fixing MessageStore cache
This fixes KahaDBStore to properly check for an existing MessageStore
in the cache before creating a new one. This will prevent potential
issues with metrics.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/de24980a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/de24980a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/de24980a
Branch: refs/heads/master
Commit: de24980a623c864d24cecf9ed852bec38cf09ae3
Parents: 41ee3ec
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Aug 7 17:13:37 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Aug 7 17:23:25 2015 +0000
----------------------------------------------------------------------
.../activemq/store/kahadb/KahaDBStore.java | 26 ++++++++++++++++----
.../store/AbstractMessageStoreSizeStatTest.java | 20 +++++++++++++++
2 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index bf14d69..84aba07 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1004,16 +1004,32 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
- storeCache.put(key(convert(destination)), store);
+ String key = key(convert(destination));
+ MessageStore store = storeCache.get(key(convert(destination)));
+ if (store == null) {
+ final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination));
+ store = storeCache.putIfAbsent(key, queueStore);
+ if (store == null) {
+ store = queueStore;
+ }
+ }
+
return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
- storeCache.put(key(convert(destination)), store);
- return store;
+ String key = key(convert(destination));
+ MessageStore store = storeCache.get(key(convert(destination)));
+ if (store == null) {
+ final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+ store = storeCache.putIfAbsent(key, topicStore);
+ if (store == null) {
+ store = topicStore;
+ }
+ }
+
+ return (TopicMessageStore) store;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
index 59ae44b..1b927b4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -34,10 +35,14 @@ import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSession;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -247,6 +252,12 @@ public abstract class AbstractMessageStoreSizeStatTest {
Topic topic = session.createTopic("test.topic");
session.createDurableSubscriber(topic, "sub1");
+ // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore)
+ //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used,
+ //then the statistics won't be updated properly because a new store would overwrite the old store
+ //which is still in use
+ ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
@@ -256,6 +267,15 @@ public abstract class AbstractMessageStoreSizeStatTest {
prod.send(createMessage(session));
}
+ //verify the view has 200 messages
+ assertEquals(1, subs.length);
+ ObjectName subName = subs[0];
+ DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+ broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+ CompositeData[] data = sub.browse();
+ assertNotNull(data);
+ assertEquals(200, data.length);
+
} finally {
connection.stop();
}