You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/04/15 15:02:44 UTC

[1/2] activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-6218"

Repository: activemq
Updated Branches:
  refs/heads/master 6d20cba0e -> b9f9f0382


Revert "https://issues.apache.org/jira/browse/AMQ-6218"

Reverting commit in favor of a better approach

This reverts commit ea09159a4087212964787e7cf68ef30170d115f0.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9c929b68
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9c929b68
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9c929b68

Branch: refs/heads/master
Commit: 9c929b6870ae4c19cf4f227fa13472b09ebac729
Parents: 6d20cba
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Apr 15 11:48:18 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Apr 15 11:48:18 2016 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/command/ActiveMQTextMessage.java     | 8 --------
 1 file changed, 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9c929b68/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 4618341..97fc9e4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -55,12 +55,6 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
     }
 
     private void copy(ActiveMQTextMessage copy) {
-        //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
-        //concurrent store and dispatch is enabled in KahaDB
-        //The issue is sometimes beforeMarshall() gets called in between the time content and
-        //text are copied to the new object leading to both fields being null when text should
-        //not be null
-        String text = this.text;
         super.copy(copy);
         copy.text = text;
     }
@@ -85,11 +79,9 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
     @Override
     public String getText() throws JMSException {
         ByteSequence content = getContent();
-        String text = this.text;
 
         if (text == null && content != null) {
             text = decodeContent(content);
-            this.text = text;
             setContent(null);
             setCompressed(false);
         }


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6256

Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6256

Calling beforeMarshall on messages when they async stored before the
store task is run and before consumer dispatch to prevent two threads
from trying to mutate the message state at the same time.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9f9f038
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9f9f038
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9f9f038

Branch: refs/heads/master
Commit: b9f9f03829a65efa2956c347d2cafa41905313c6
Parents: 9c929b6
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Fri Apr 15 13:01:21 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Fri Apr 15 13:01:21 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      |   3 +-
 .../apache/activemq/leveldb/LevelDBStore.scala  |   1 +
 .../org/apache/activemq/bugs/AMQ3436Test.java   |   1 +
 .../org/apache/activemq/bugs/AMQ6222Test.java   | 312 -------------------
 .../store/AbstractVmConcurrentDispatchTest.java | 294 +++++++++++++++++
 .../kahadb/KahaDbVmConcurrentDispatchTest.java  |  68 ++++
 .../MultiKahaDbVmConcurrentDispatchTest.java    |  81 +++++
 .../LevelDbVmConcurrentDispatchTest.java        |  61 ++++
 8 files changed, 508 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index e1c1df4..7f8283d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -66,7 +66,6 @@ import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -384,6 +383,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
                 throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
+                message.beforeMarshall(wireFormat);
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
                 ListenableFuture<Object> future = result.getFuture();
                 message.getMessageId().setFutureOrSequenceLong(future);
@@ -754,6 +754,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
         public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
                 throws IOException {
             if (isConcurrentStoreAndDispatchTopics()) {
+                message.beforeMarshall(wireFormat);
                 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
                 result.aquireLocks();
                 addTopicTask(this, result);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index f80e722..5865f35 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
     def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
       check_running
+      message.beforeMarshall(wireFormat);
       message.incrementReferenceCount()
       uow.addCompleteListener({
         message.decrementReferenceCount()

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
index 65e0783..d9156de 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
@@ -145,6 +145,7 @@ public class AMQ3436Test {
 
             boolean firstMessage = true;
 
+            @Override
             public void onMessage(Message msg) {
                 try {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
deleted file mode 100644
index 512ef9d..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This test can also be used to debug AMQ-6218 and AMQ-6221
- *
- * This test shows that messages are received with non-null data while
- * several consumers are used.
- */
-@RunWith(Parameterized.class)
-public class AMQ6222Test {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AMQ6222Test.class);
-
-    private final MessageType messageType;
-    private final boolean reduceMemoryFootPrint;
-    private final boolean concurrentDispatch;
-
-    private static enum MessageType {TEXT, MAP, OBJECT}
-    private final static boolean[] booleanVals = {true, false};
-    private static boolean[] reduceMemoryFootPrintVals = booleanVals;
-    private static boolean[] concurrentDispatchVals = booleanVals;
-
-    @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
-    public static Collection<Object[]> data() {
-        List<Object[]> values = new ArrayList<>();
-
-        for (MessageType mt : MessageType.values()) {
-            for (boolean rmfVal : reduceMemoryFootPrintVals) {
-                for (boolean cdVal : concurrentDispatchVals) {
-                    values.add(new Object[] {mt, rmfVal, cdVal});
-                }
-            }
-        }
-
-        return values;
-    }
-
-    public AMQ6222Test(MessageType messageType, boolean reduceMemoryFootPrint,
-            boolean concurrentDispatch) {
-        this.messageType = messageType;
-        this.reduceMemoryFootPrint = reduceMemoryFootPrint;
-        this.concurrentDispatch = concurrentDispatch;
-    }
-
-    private BrokerService broker;
-    private final AtomicBoolean failure = new AtomicBoolean();
-    private CountDownLatch ready;
-    private URI connectionURI;
-    private URI vmConnectionURI;
-
-    private final boolean USE_VM_TRANSPORT = true;
-
-    private final int NUM_CONSUMERS = 30;
-    private final int NUM_PRODUCERS = 1;
-    private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS;
-
-    private int i = 0;
-    private String MessageId = null;
-    private int MessageCount = 0;
-
-    @Before
-    public void setUp() throws Exception {
-        broker = new BrokerService();
-        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
-        broker.setDeleteAllMessagesOnStartup(true);
-        PolicyMap policyMap = new PolicyMap();
-        PolicyEntry defaultPolicy = new PolicyEntry();
-        defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint);
-        policyMap.setDefaultEntry(defaultPolicy);
-        broker.setDestinationPolicy(policyMap);
-        KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-        ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
-        broker.start();
-        broker.waitUntilStarted();
-
-        ready = new CountDownLatch(NUM_TASKS);
-        connectionURI = connector.getPublishableConnectURI();
-        vmConnectionURI = broker.getVmConnectorURI();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-        }
-    }
-
-    @Test(timeout=180000)
-    public void testMessagesAreValid() throws Exception {
-
-        ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS);
-        for (int i = 0; i < NUM_CONSUMERS; i++) {
-            LOG.info("Created Consumer: {}", i + 1);
-            tasks.execute(new HelloWorldConsumer());
-        }
-
-        for (int i = 0; i < NUM_PRODUCERS; i++) {
-            LOG.info("Created Producer: {}", i + 1);
-            tasks.execute(new HelloWorldProducer());
-        }
-
-        assertTrue(ready.await(20, TimeUnit.SECONDS));
-
-        try {
-            tasks.shutdown();
-            tasks.awaitTermination(10, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            //should get exception with no errors
-        }
-
-        assertFalse("Test Encountered a null bodied message", failure.get());
-    }
-
-    public URI getBrokerURI() {
-        if (USE_VM_TRANSPORT) {
-            return vmConnectionURI;
-        } else {
-            return connectionURI;
-        }
-    }
-
-    public class HelloWorldProducer implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                ActiveMQConnectionFactory connectionFactory =
-                    new ActiveMQConnectionFactory(getBrokerURI());
-
-                Connection connection = connectionFactory.createConnection();
-                connection.start();
-
-                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-                Destination destination = session.createTopic("VirtualTopic.AMQ6218Test");
-
-                MessageProducer producer = session.createProducer(destination);
-
-                LOG.info("Producer: {}", destination);
-
-                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-                producer.setPriority(4);
-                producer.setTimeToLive(0);
-
-                ready.countDown();
-
-                int j = 0;
-                while (!failure.get()) {
-                    j++;
-                    String text = "AMQ Message Number :" + j;
-                    Message message = null;
-                    if (messageType.equals(MessageType.MAP)) {
-                        MapMessage mapMessage = session.createMapMessage();
-                        mapMessage.setString("text", text);
-                        message = mapMessage;
-                    } else if (messageType.equals(MessageType.OBJECT)) {
-                        ObjectMessage objectMessage = session.createObjectMessage();
-                        objectMessage.setObject(text);
-                        message = objectMessage;
-                    } else {
-                        message = session.createTextMessage(text);
-                    }
-                    producer.send(message);
-                    LOG.info("Sent message: {}", message.getJMSMessageID());
-                }
-
-                connection.close();
-            } catch (Exception e) {
-                LOG.error("Caught: " + e);
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public class HelloWorldConsumer implements Runnable, ExceptionListener {
-        String queueName;
-
-        @Override
-        public void run() {
-            try {
-
-                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURI());
-                Connection connection = connectionFactory.createConnection();
-                connection.start();
-
-                Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
-                synchronized (this) {
-                    queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test";
-                    i++;
-                    LOG.info(queueName);
-                }
-
-                Destination destination = session.createQueue(queueName);
-                MessageConsumer consumer = session.createConsumer(destination);
-
-                ready.countDown();
-
-                while (!failure.get()) {
-
-                    Message message = consumer.receive(500);
-
-                    if (message != null) {
-                        synchronized (this) {
-                            if (MessageId != null) {
-                                if (message.getJMSMessageID().equalsIgnoreCase(MessageId)) {
-                                    MessageCount++;
-                                } else {
-                                    LOG.info("Count of message " + MessageId + " is " + MessageCount);
-                                    MessageCount = 1;
-                                    MessageId = message.getJMSMessageID();
-                                }
-                            } else {
-                                MessageId = message.getJMSMessageID();
-                                MessageCount = 1;
-                            }
-                        }
-
-                        String text = null;
-                        if (messageType.equals(MessageType.OBJECT) && message instanceof ObjectMessage) {
-                            ObjectMessage objectMessage = (ObjectMessage) message;
-                            text = (String) objectMessage.getObject();
-                        } else if (messageType.equals(MessageType.TEXT) && message instanceof TextMessage) {
-                            TextMessage textMessage = (TextMessage) message;
-                            text = textMessage.getText();
-                        } else if (messageType.equals(MessageType.MAP) && message instanceof MapMessage) {
-                            MapMessage mapMessage = (MapMessage) message;
-                            text = mapMessage.getString("text");
-                        } else {
-                            LOG.info(queueName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message);
-                        }
-
-                        if (text == null) {
-                            LOG.warn(queueName + " text received as a null " + message);
-                            failure.set(true);
-                        } else {
-                            LOG.info(queueName + " text " + text + " message id: " + message.getJMSMessageID());
-                        }
-
-                        message.acknowledge();
-                    }
-                }
-
-                connection.close();
-            } catch (Exception e) {
-                LOG.error("Caught: ", e);
-            }
-        }
-
-        @Override
-        public synchronized void onException(JMSException ex) {
-            LOG.error("JMS Exception occurred.  Shutting down client.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
new file mode 100644
index 0000000..aaaaf69
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test can also be used to debug AMQ-6218 and AMQ-6221
+ *
+ * This test shows that messages are received with non-null data while
+ * several consumers are used.
+ */
+public abstract class AbstractVmConcurrentDispatchTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class);
+
+    private final MessageType messageType;
+    private final boolean reduceMemoryFootPrint;
+
+    protected static enum MessageType {TEXT, MAP, OBJECT}
+    protected final static boolean[] booleanVals = {true, false};
+    protected static boolean[] reduceMemoryFootPrintVals = booleanVals;
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+    public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
+        this.messageType = messageType;
+        this.reduceMemoryFootPrint = reduceMemoryFootPrint;
+    }
+
+    private BrokerService broker;
+    private final AtomicBoolean failure = new AtomicBoolean();
+    private CountDownLatch ready;
+    private URI connectionURI;
+    private URI vmConnectionURI;
+
+    private final boolean USE_VM_TRANSPORT = true;
+
+    private final int NUM_CONSUMERS = 30;
+    private final int NUM_PRODUCERS = 1;
+    private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS;
+
+    private int i = 0;
+    private String MessageId = null;
+    private int MessageCount = 0;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
+        broker.setDeleteAllMessagesOnStartup(true);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint);
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+        configurePersistenceAdapter(broker);
+        broker.start();
+        broker.waitUntilStarted();
+
+        ready = new CountDownLatch(NUM_TASKS);
+        connectionURI = connector.getPublishableConnectURI();
+        vmConnectionURI = broker.getVmConnectorURI();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected abstract void configurePersistenceAdapter(final BrokerService broker) throws IOException;
+
+    @Test(timeout=180000)
+    public void testMessagesAreValid() throws Exception {
+
+        ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS);
+        for (int i = 0; i < NUM_CONSUMERS; i++) {
+            LOG.info("Created Consumer: {}", i + 1);
+            tasks.execute(new HelloWorldConsumer());
+        }
+
+        for (int i = 0; i < NUM_PRODUCERS; i++) {
+            LOG.info("Created Producer: {}", i + 1);
+            tasks.execute(new HelloWorldProducer());
+        }
+
+        assertTrue(ready.await(20, TimeUnit.SECONDS));
+
+        try {
+            tasks.shutdown();
+            tasks.awaitTermination(20, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            //should get exception with no errors
+        }
+
+        assertFalse("Test Encountered a null bodied message", failure.get());
+    }
+
+    public URI getBrokerURI() {
+        if (USE_VM_TRANSPORT) {
+            return vmConnectionURI;
+        } else {
+            return connectionURI;
+        }
+    }
+
+    public class HelloWorldProducer implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                ActiveMQConnectionFactory connectionFactory =
+                    new ActiveMQConnectionFactory(getBrokerURI());
+
+                Connection connection = connectionFactory.createConnection();
+                connection.start();
+
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Destination destination = session.createTopic("VirtualTopic.AMQ6218Test");
+
+                MessageProducer producer = session.createProducer(destination);
+
+                LOG.info("Producer: {}", destination);
+
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                producer.setPriority(4);
+                producer.setTimeToLive(0);
+
+                ready.countDown();
+
+                int j = 0;
+                while (!failure.get()) {
+                    j++;
+                    String text = "AMQ Message Number :" + j;
+                    Message message = null;
+                    if (messageType.equals(MessageType.MAP)) {
+                        MapMessage mapMessage = session.createMapMessage();
+                        mapMessage.setString("text", text);
+                        message = mapMessage;
+                    } else if (messageType.equals(MessageType.OBJECT)) {
+                        ObjectMessage objectMessage = session.createObjectMessage();
+                        objectMessage.setObject(text);
+                        message = objectMessage;
+                    } else {
+                        message = session.createTextMessage(text);
+                    }
+                    producer.send(message);
+                    LOG.info("Sent message: {}", message.getJMSMessageID());
+                }
+
+                connection.close();
+            } catch (Exception e) {
+                LOG.error("Caught: " + e);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public class HelloWorldConsumer implements Runnable, ExceptionListener {
+        String queueName;
+
+        @Override
+        public void run() {
+            try {
+
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURI());
+                Connection connection = connectionFactory.createConnection();
+                connection.start();
+
+                Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+                synchronized (this) {
+                    queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test";
+                    i++;
+                    LOG.info(queueName);
+                }
+
+                Destination destination = session.createQueue(queueName);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                ready.countDown();
+
+                while (!failure.get()) {
+
+                    Message message = consumer.receive(500);
+
+                    if (message != null) {
+                        synchronized (this) {
+                            if (MessageId != null) {
+                                if (message.getJMSMessageID().equalsIgnoreCase(MessageId)) {
+                                    MessageCount++;
+                                } else {
+                                    LOG.info("Count of message " + MessageId + " is " + MessageCount);
+                                    MessageCount = 1;
+                                    MessageId = message.getJMSMessageID();
+                                }
+                            } else {
+                                MessageId = message.getJMSMessageID();
+                                MessageCount = 1;
+                            }
+                        }
+
+                        String text = null;
+                        if (messageType.equals(MessageType.OBJECT) && message instanceof ObjectMessage) {
+                            ObjectMessage objectMessage = (ObjectMessage) message;
+                            text = (String) objectMessage.getObject();
+                        } else if (messageType.equals(MessageType.TEXT) && message instanceof TextMessage) {
+                            TextMessage textMessage = (TextMessage) message;
+                            text = textMessage.getText();
+                        } else if (messageType.equals(MessageType.MAP) && message instanceof MapMessage) {
+                            MapMessage mapMessage = (MapMessage) message;
+                            text = mapMessage.getString("text");
+                        } else {
+                            LOG.info(queueName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message);
+                        }
+
+                        if (text == null) {
+                            LOG.warn(queueName + " text received as a null " + message);
+                            failure.set(true);
+                        } else {
+                            LOG.info(queueName + " text " + text + " message id: " + message.getJMSMessageID());
+                        }
+
+                        message.acknowledge();
+                    }
+                }
+
+                connection.close();
+            } catch (Exception e) {
+                LOG.error("Caught: ", e);
+            }
+        }
+
+        @Override
+        public synchronized void onException(JMSException ex) {
+            LOG.error("JMS Exception occurred.  Shutting down client.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
new file mode 100644
index 0000000..217a7c7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
+
+    private final boolean concurrentDispatch;
+    private static boolean[] concurrentDispatchVals = booleanVals;
+
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
+      public static Collection<Object[]> data() {
+          List<Object[]> values = new ArrayList<>();
+
+          for (MessageType mt : MessageType.values()) {
+              for (boolean rmfVal : reduceMemoryFootPrintVals) {
+                  for (boolean cdVal : concurrentDispatchVals) {
+                      values.add(new Object[] {mt, rmfVal, cdVal});
+                  }
+              }
+          }
+
+          return values;
+      }
+
+    /**
+     * @param messageType
+     * @param reduceMemoryFootPrint
+     * @param concurrentDispatch
+     */
+    public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
+            boolean concurrentDispatch) {
+        super(messageType, reduceMemoryFootPrint);
+        this.concurrentDispatch = concurrentDispatch;
+    }
+
+    @Override
+    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
new file mode 100644
index 0000000..3d16ce7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
+
+    private final boolean concurrentDispatch;
+    private static boolean[] concurrentDispatchVals = booleanVals;
+
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
+      public static Collection<Object[]> data() {
+          List<Object[]> values = new ArrayList<>();
+
+          for (MessageType mt : MessageType.values()) {
+              for (boolean rmfVal : reduceMemoryFootPrintVals) {
+                  for (boolean cdVal : concurrentDispatchVals) {
+                      values.add(new Object[] {mt, rmfVal, cdVal});
+                  }
+              }
+          }
+
+          return values;
+      }
+
+    /**
+     * @param messageType
+     * @param reduceMemoryFootPrint
+     * @param concurrentDispatch
+     */
+    public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
+            boolean concurrentDispatch) {
+        super(messageType, reduceMemoryFootPrint);
+        this.concurrentDispatch = concurrentDispatch;
+    }
+
+    @Override
+    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        //setup multi-kaha adapter
+        MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(dataFileDir.getRoot());
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(false);
+        List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+        stores.add(filtered);
+
+        persistenceAdapter.setFilteredPersistenceAdapters(stores);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b9f9f038/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
new file mode 100644
index 0000000..d1b7e43
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.leveldb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.LevelDBStoreFactory;
+import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
+
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}")
+      public static Collection<Object[]> data() {
+          List<Object[]> values = new ArrayList<>();
+
+          for (MessageType mt : MessageType.values()) {
+              for (boolean rmfVal : reduceMemoryFootPrintVals) {
+                  values.add(new Object[] {mt, rmfVal});
+              }
+          }
+
+          return values;
+      }
+
+    /**
+     * @param messageType
+     * @param reduceMemoryFootPrint
+     * @param concurrentDispatch
+     */
+    public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
+        super(messageType, reduceMemoryFootPrint);
+    }
+
+    @Override
+    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        broker.setPersistenceFactory(new LevelDBStoreFactory());
+    }
+
+}