You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/09/20 09:23:05 UTC
[activemq] branch master updated: AMQ-7308 - ensure kahadb message
add does not auto create the message store in error,
expect an existing store. fix and test
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 289750d AMQ-7308 - ensure kahadb message add does not auto create the message store in error, expect an existing store. fix and test
289750d is described below
commit 289750d7c9849bd26f19e9116457eb72a3412d05
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Sep 20 10:22:56 2019 +0100
AMQ-7308 - ensure kahadb message add does not auto create the message store in error, expect an existing store. fix and test
---
.../store/PersistenceAdapterTestSupport.java | 1 +
.../activemq/store/kahadb/MessageDatabase.java | 25 ++-
.../store/kahadb/MessageDatabaseSizeTest.java | 2 +
.../VirtualTopicConcurrentSendDeleteTest.java | 177 +++++++++++++++++++++
4 files changed, 203 insertions(+), 2 deletions(-)
diff --git a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
index 16d46e1..39a63ae 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java
@@ -57,6 +57,7 @@ abstract public class PersistenceAdapterTestSupport extends TestCase {
MessageStore ms = pa.createQueueMessageStore(new ActiveMQQueue("TEST"));
+ ms.start();
ConnectionContext context = new ConnectionContext();
ActiveMQTextMessage message = new ActiveMQTextMessage();
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 1a120a2..ac8ea48 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1205,6 +1205,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
*/
void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
+ initMessageStore(data);
process(data, location, (IndexAware) null);
} else {
// just recover producer audit
@@ -1217,6 +1218,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
+ private void initMessageStore(JournalCommand<?> data) throws IOException {
+ data.visit(new Visitor() {
+ @Override
+ public void visit(KahaAddMessageCommand command) throws IOException {
+ final KahaDestination destination = command.getDestination();
+ if (!storedDestinations.containsKey(key(destination))) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ getStoredDestination(destination, tx);
+ }
+ });
+ }
+ }
+ });
+ }
+
// /////////////////////////////////////////////////////////////////
// Journaled record processing methods. Once the record is journaled,
// these methods handle applying the index updates. These may be called
@@ -1486,8 +1504,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<>();
long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
- StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-
+ StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx);
+ if (sd == null) {
+ // if the store no longer exists, skip
+ return -1;
+ }
// Skip adding the message to the index if this is a topic and there are
// no subscriptions.
if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
index 4deb1e0..82d4018 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
@@ -114,6 +114,7 @@ public class MessageDatabaseSizeTest {
//Add a single message and update once so we can compare the size consistently
MessageStore messageStore = store.createQueueMessageStore(destination);
+ messageStore.start();
messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
messageStore.updateMessage(textMessage);
@@ -134,6 +135,7 @@ public class MessageDatabaseSizeTest {
//Add a single message and update once so we can compare the size consistently
MessageStore messageStore = store.createQueueMessageStore(destination);
+ messageStore.start();
messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
textMessage.setText("new size of message");
messageStore.updateMessage(textMessage);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java
new file mode 100644
index 0000000..d4be5ae
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class VirtualTopicConcurrentSendDeleteTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicConcurrentSendDeleteTest.class);
+
+ BrokerService brokerService;
+ ConnectionFactory connectionFactory;
+
+ @Before
+ public void createBroker() throws Exception {
+ createBroker(true);
+ }
+
+ public void createBroker(boolean delete) throws Exception {
+ brokerService = new BrokerService();
+ //brokerService.setPersistent(false);
+ brokerService.setDeleteAllMessagesOnStartup(delete);
+ brokerService.setAdvisorySupport(false);
+ ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+ brokerService.start();
+
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+ activeMQConnectionFactory.setWatchTopicAdvisories(false);
+ activeMQConnectionFactory.setAlwaysSyncSend(false);
+ ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
+ zeroPrefetch.setAll(0);
+ activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
+ connectionFactory = activeMQConnectionFactory;
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ brokerService.stop();
+ }
+
+ @Test
+ public void testConsumerQueueDeleteOk() throws Exception {
+
+ final int numConnections = 1;
+ final int numDestinations = 10;
+ final int numMessages = 4000;
+
+ ExecutorService executorService = Executors.newFixedThreadPool(numConnections * 2);
+
+ brokerService.getRegionBroker().addDestination(
+ brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST"), false);
+
+ // precreate dests to accentuate read access
+ for (int i=0; i<numDestinations; i++ ) {
+ brokerService.getRegionBroker().addDestination(
+ brokerService.getAdminConnectionContext(),
+ new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"),
+ false);
+ }
+
+ final CountDownLatch doneOne = new CountDownLatch(numConnections);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ int messagestoSend = 0;
+
+ Connection connection1 = connectionFactory.createConnection();
+ connection1.start();
+
+ Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ do {
+ producer.send(new ActiveMQTopic("VirtualTopic.TEST"), new ActiveMQMessage());
+ messagestoSend++;
+
+ if (messagestoSend == 1000) {
+ doneOne.countDown();
+ }
+ } while (messagestoSend < numMessages);
+
+ connection1.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ for (int i = 0; i < numConnections; i++) {
+ executorService.execute(runnable);
+ }
+
+ // delete all of the consumer queues
+ final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
+
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ doneOne.await(30, TimeUnit.SECONDS);
+
+ // delete in reverse to clash with send in forward direction
+ for (int i=numDestinations-1; i>=0; i--) {
+ final ActiveMQQueue toDelete = new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST");
+
+ ObjectName queueViewMBeanName = new ObjectName(prefix + toDelete.getQueueName());
+ QueueViewMBean proxy = (QueueViewMBean)
+ brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ LOG.info("Q len: " + toDelete.getQueueName() + ", " + proxy.getQueueSize());
+ brokerService.getAdminView().removeQueue(toDelete.getPhysicalName());
+
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ });
+
+
+ executorService.shutdown();
+ executorService.awaitTermination(5, TimeUnit.MINUTES);
+
+ LOG.info("Enqueues: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+ final int numQueues = ((RegionBroker)brokerService.getRegionBroker()).getQueueRegion().getDestinationMap().size();
+ LOG.info("Destinations: " + numQueues );
+
+ assertEquals("no queues left", 0, numQueues);
+
+ // the bug
+ assertEquals("no queues, just one topic, in kahadb", 1, brokerService.getPersistenceAdapter().getDestinations().size());
+ }
+}