You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2015/02/09 19:12:14 UTC

[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5558 - add producer/consumer commands

Repository: activemq
Updated Branches:
  refs/heads/master b0a1bd833 -> 3155c625c


https://issues.apache.org/jira/browse/AMQ-5558 - add producer/consumer commands


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9f0ab46e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9f0ab46e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9f0ab46e

Branch: refs/heads/master
Commit: 9f0ab46e293e6d31369f06f6669cd3d63db906fa
Parents: b0a1bd8
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Feb 5 13:58:22 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Feb 5 13:58:31 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/util/ConsumerThread.java    | 154 +++++++++++++
 .../apache/activemq/util/ProducerThread.java    | 231 +++++++++++++++++++
 .../resources/org/apache/activemq/util/demo.txt |  15 ++
 .../console/command/AbstractCommand.java        |  31 +++
 .../console/command/ConsumerCommand.java        | 157 +++++++++++++
 .../console/command/ProducerCommand.java        | 206 +++++++++++++++++
 .../org.apache.activemq.console.command.Command |   2 +
 .../activemq/console/command/consumer.txt       |  11 +
 .../activemq/console/command/producer.txt       |  16 ++
 .../org/apache/activemq/bugs/AMQ3120Test.java   |   2 +-
 .../org/apache/activemq/bugs/AMQ4323Test.java   |   2 +-
 .../activemq/usecases/MemoryLimitTest.java      |   6 +-
 .../apache/activemq/util/ConsumerThread.java    |  80 -------
 .../apache/activemq/util/ProducerThread.java    |  82 -------
 14 files changed, 828 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
new file mode 100644
index 0000000..402b2a5
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.CountDownLatch;
+
+public class ConsumerThread extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
+
+    int messageCount = 1000;
+    int receiveTimeOut = 3000;
+    Destination destination;
+    Session session;
+    boolean breakOnNull = true;
+    int sleep;
+    int transactionBatchSize;
+
+    int received = 0;
+    int transactions = 0;
+    boolean running = false;
+    CountDownLatch finished;
+
+    public ConsumerThread(Session session, Destination destination) {
+        this.destination = destination;
+        this.session = session;
+    }
+
+    @Override
+    public void run() {
+        running = true;
+        MessageConsumer consumer = null;
+        String threadName = Thread.currentThread().getName();
+        LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
+        try {
+            consumer = session.createConsumer(destination);
+            while (running && received < messageCount) {
+                Message msg = consumer.receive(receiveTimeOut);
+                if (msg != null) {
+                    LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
+                    received++;
+                } else {
+                    if (breakOnNull) {
+                        break;
+                    }
+                }
+
+                if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
+                    LOG.info(threadName + " Committing transaction: " + transactions++);
+                    session.commit();
+                }
+
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
+
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (finished != null) {
+                finished.countDown();
+            }
+            if (consumer != null) {
+                LOG.info(threadName + " Consumed: " + this.getReceived() + " messages");
+                try {
+                    consumer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        LOG.info(threadName + " Consumer thread finished");
+    }
+
+    public int getReceived() {
+        return received;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public void setBreakOnNull(boolean breakOnNull) {
+        this.breakOnNull = breakOnNull;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public boolean isBreakOnNull() {
+        return breakOnNull;
+    }
+
+    public int getReceiveTimeOut() {
+        return receiveTimeOut;
+    }
+
+    public void setReceiveTimeOut(int receiveTimeOut) {
+        this.receiveTimeOut = receiveTimeOut;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public CountDownLatch getFinished() {
+        return finished;
+    }
+
+    public void setFinished(CountDownLatch finished) {
+        this.finished = finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
new file mode 100644
index 0000000..ad44259
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+
+public class ProducerThread extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
+
+    int messageCount = 1000;
+    Destination destination;
+    protected Session session;
+    int sleep = 0;
+    boolean persistent = true;
+    int messageSize = 0;
+    int textMessageSize;
+    long msgTTL = 0L;
+    String msgGroupID=null;
+    int transactionBatchSize;
+
+    int transactions = 0;
+    int sentCount = 0;
+    byte[] payload = null;
+    boolean running = false;
+    CountDownLatch finished;
+
+
+    public ProducerThread(Session session, Destination destination) {
+        this.destination = destination;
+        this.session = session;
+    }
+
+    public void run() {
+        MessageProducer producer = null;
+        String threadName = Thread.currentThread().getName();
+        try {
+            producer = session.createProducer(destination);
+            producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            producer.setTimeToLive(msgTTL);
+            initPayLoad();
+            running = true;
+
+            LOG.info(threadName +  " Started to calculate elapsed time ...\n");
+            long tStart = System.currentTimeMillis();
+
+            for (sentCount = 0; sentCount < messageCount && running; sentCount++) {
+                Message message = createMessage(sentCount);
+                producer.send(message);
+                LOG.info(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
+
+                if (transactionBatchSize > 0 && sentCount > 0 && sentCount % transactionBatchSize == 0) {
+                    LOG.info(threadName + " Committing transaction: " + transactions++);
+                    session.commit();
+                }
+
+                if (sleep > 0) {
+                    Thread.sleep(sleep);
+                }
+            }
+
+            LOG.info(threadName + " Produced: " + this.getSentCount() + " messages");
+            long tEnd = System.currentTimeMillis();
+            long elapsed = (tEnd - tStart) / 1000;
+            LOG.info(threadName + " Elapsed time in second : " + elapsed + " s");
+            LOG.info(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (finished != null) {
+                finished.countDown();
+            }
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private void initPayLoad() {
+        if (messageSize > 0) {
+            payload = new byte[messageSize];
+            for (int i = 0; i < payload.length; i++) {
+                payload[i] = '.';
+            }
+        }
+    }
+
+    protected Message createMessage(int i) throws Exception {
+        Message message = null;
+        if (payload != null) {
+            message = session.createBytesMessage();
+            ((BytesMessage)message).writeBytes(payload);
+        } else {
+            if (textMessageSize > 0) {
+                InputStreamReader reader = null;
+                try {
+                    InputStream is = getClass().getResourceAsStream("demo.txt");
+                    reader = new InputStreamReader(is);
+                    char[] chars = new char[textMessageSize];
+                    reader.read(chars);
+                    message = session.createTextMessage(String.valueOf(chars));
+                } catch (Exception e) {
+                    LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize + " bytes of demo text. Using default text message instead");
+                    message = session.createTextMessage("test message: " + i);
+                } finally {
+                    if (reader != null) {
+                        reader.close();
+                    }
+                }
+            } else {
+                message = session.createTextMessage("test message: " + i);
+            }
+        }
+        if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
+            message.setStringProperty("JMSXGroupID", msgGroupID);
+        }
+        return message;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public int getSentCount() {
+        return sentCount;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    public long getMsgTTL() {
+        return msgTTL;
+    }
+
+    public void setMsgTTL(long msgTTL) {
+        this.msgTTL = msgTTL;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public String getMsgGroupID() {
+        return msgGroupID;
+    }
+
+    public void setMsgGroupID(String msgGroupID) {
+        this.msgGroupID = msgGroupID;
+    }
+
+    public int getTextMessageSize() {
+        return textMessageSize;
+    }
+
+    public void setTextMessageSize(int textMessageSize) {
+        this.textMessageSize = textMessageSize;
+    }
+
+    public int getMessageSize() {
+        return messageSize;
+    }
+
+    public void setMessageSize(int messageSize) {
+        this.messageSize = messageSize;
+    }
+
+    public CountDownLatch getFinished() {
+        return finished;
+    }
+
+    public void setFinished(CountDownLatch finished) {
+        this.finished = finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt b/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
new file mode 100644
index 0000000..4a6002e
--- /dev/null
+++ b/activemq-client/src/main/resources/org/apache/activemq/util/demo.txt
@@ -0,0 +1,15 @@
+Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
+At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
+Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos 
 et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
+Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
+Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
+Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
index de63347..057f9f3 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/AbstractCommand.java
@@ -16,10 +16,15 @@
  */
 package org.apache.activemq.console.command;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.List;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.util.IntrospectionSupport;
 
 public abstract class AbstractCommand implements Command {
     public static final String COMMAND_OPTION_DELIMETER = ",";
@@ -110,6 +115,14 @@ public abstract class AbstractCommand implements Command {
             }
             System.setProperty(key, value);
         } else {
+            if (token.startsWith("--")) {
+                String prop = token.substring(2);
+                if (tokens.isEmpty() || tokens.get(0).startsWith("-")) {
+                    context.print("Property '" + prop + "' is not specified!");
+                } else if (IntrospectionSupport.setProperty(this, prop, tokens.remove(0))) {
+                    return;
+                }
+            }
             // Token is unrecognized
             context.printInfo("Unrecognized option: " + token);
             isPrintHelp = true;
@@ -128,4 +141,22 @@ public abstract class AbstractCommand implements Command {
      * Print the help messages for the specific task
      */
     protected abstract void printHelp();
+
+    protected void printHelpFromFile() {
+        BufferedReader reader = null;
+        try {
+            InputStream is = getClass().getResourceAsStream(getName() + ".txt");
+            reader = new BufferedReader(new InputStreamReader(is));
+            String line;
+            while ((line = reader.readLine()) != null) {
+                context.print(line);
+            }
+        } catch (Exception e) {} finally {
+            if (reader != null) {
+                try {
+                    reader.close();
+                } catch (IOException e) {}
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
new file mode 100644
index 0000000..962b6ad
--- /dev/null
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class ConsumerCommand extends AbstractCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class);
+
+    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+    String user = ActiveMQConnectionFactory.DEFAULT_USER;
+    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
+    String destination = "queue://TEST";
+    int messageCount = 1000;
+    int sleep;
+    int transactionBatchSize;
+    int parallelThreads = 1;
+
+    @Override
+    protected void runTask(List<String> tokens) throws Exception {
+        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
+        LOG.info("Consuming " + destination);
+        LOG.info("Sleeping between receives " + sleep + " ms");
+        LOG.info("Running " + parallelThreads + " parallel threads");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection conn = factory.createConnection(user, password);
+        conn.start();
+
+        Session sess;
+        if (transactionBatchSize != 0) {
+            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+
+        CountDownLatch active = new CountDownLatch(parallelThreads);
+
+        for (int i = 1; i <= parallelThreads; i++) {
+            ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
+            consumer.setName("consumer-" + i);
+            consumer.setBreakOnNull(false);
+            consumer.setMessageCount(messageCount);
+            consumer.setSleep(sleep);
+            consumer.setTransactionBatchSize(transactionBatchSize);
+            consumer.setFinished(active);
+            consumer.start();
+        }
+
+        active.await();
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public int getParallelThreads() {
+        return parallelThreads;
+    }
+
+    public void setParallelThreads(int parallelThreads) {
+        this.parallelThreads = parallelThreads;
+    }
+
+    @Override
+    protected void printHelp() {
+        printHelpFromFile();
+    }
+
+    @Override
+    public String getName() {
+        return "consumer";
+    }
+
+    @Override
+    public String getOneLineDescription() {
+        return "Receives messages from the broker";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
new file mode 100644
index 0000000..ba696bb
--- /dev/null
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.console.command;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ProducerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.io.*;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class ProducerCommand extends AbstractCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
+
+    String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+    String user = ActiveMQConnectionFactory.DEFAULT_USER;
+    String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
+    String destination = "queue://TEST";
+    int messageCount = 1000;
+    int sleep = 0;
+    boolean persistent = true;
+    int messageSize = 0;
+    int textMessageSize;
+    long msgTTL = 0L;
+    String msgGroupID=null;
+    int transactionBatchSize;
+    private int parallelThreads = 1;
+
+    @Override
+    protected void runTask(List<String> tokens) throws Exception {
+        LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
+        LOG.info("Producing messages to " + destination);
+        LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+        LOG.info("Sleeping between sends " + sleep + " ms");
+        LOG.info("Running " + parallelThreads + " parallel threads");
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
+        Connection conn = factory.createConnection(user, password);
+        conn.start();
+
+        Session sess;
+        if (transactionBatchSize != 0) {
+            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } else {
+            sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        CountDownLatch active = new CountDownLatch(parallelThreads);
+
+        for (int i = 1; i <= parallelThreads; i++) {
+            ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
+            producer.setName("producer-" + i);
+            producer.setMessageCount(messageCount);
+            producer.setSleep(sleep);
+            producer.setMsgTTL(msgTTL);
+            producer.setPersistent(persistent);
+            producer.setTransactionBatchSize(transactionBatchSize);
+            producer.setMessageSize(messageSize);
+            producer.setMsgGroupID(msgGroupID);
+            producer.setTextMessageSize(textMessageSize);
+            producer.setFinished(active);
+            producer.start();
+        }
+
+        active.await();
+    }
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public void setDestination(String destination) {
+        this.destination = destination;
+    }
+
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public int getSleep() {
+        return sleep;
+    }
+
+    public void setSleep(int sleep) {
+        this.sleep = sleep;
+    }
+
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public int getMessageSize() {
+        return messageSize;
+    }
+
+    public void setMessageSize(int messageSize) {
+        this.messageSize = messageSize;
+    }
+
+    public int getTextMessageSize() {
+        return textMessageSize;
+    }
+
+    public void setTextMessageSize(int textMessageSize) {
+        this.textMessageSize = textMessageSize;
+    }
+
+    public long getMsgTTL() {
+        return msgTTL;
+    }
+
+    public void setMsgTTL(long msgTTL) {
+        this.msgTTL = msgTTL;
+    }
+
+    public String getMsgGroupID() {
+        return msgGroupID;
+    }
+
+    public void setMsgGroupID(String msgGroupID) {
+        this.msgGroupID = msgGroupID;
+    }
+
+    public int getTransactionBatchSize() {
+        return transactionBatchSize;
+    }
+
+    public void setTransactionBatchSize(int transactionBatchSize) {
+        this.transactionBatchSize = transactionBatchSize;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public int getParallelThreads() {
+        return parallelThreads;
+    }
+
+    public void setParallelThreads(int parallelThreads) {
+        this.parallelThreads = parallelThreads;
+    }
+
+    @Override
+    protected void printHelp() {
+        printHelpFromFile();
+    }
+
+    @Override
+    public String getName() {
+        return "producer";
+    }
+
+    @Override
+    public String getOneLineDescription() {
+        return "Sends messages to the broker";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
index 64749c6..bb7ddb6 100644
--- a/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
+++ b/activemq-console/src/main/resources/META-INF/services/org.apache.activemq.console.command.Command
@@ -26,3 +26,5 @@ org.apache.activemq.console.command.EncryptCommand
 org.apache.activemq.console.command.DecryptCommand
 org.apache.activemq.console.command.StoreExportCommand
 org.apache.activemq.console.command.PurgeCommand
+org.apache.activemq.console.command.ProducerCommand
+org.apache.activemq.console.command.ConsumerCommand

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
new file mode 100644
index 0000000..a834ca1
--- /dev/null
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
@@ -0,0 +1,11 @@
+Usage: consumer [OPTIONS]
+Description: Demo consumer that can be used to receive messages to the broker
+Options :
+    [--brokerUrl                                   URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
+    [--user                                         ..] - connection user name
+    [--password                                     ..] - connection password
+    [--destination               queue://..|topic://..] - ; default TEST
+    [--messageCount                                  N] - number of messages to send; default 1000
+    [--sleep                                         N] - millisecond sleep period between sends or receives; default 0
+    [--transactionBatchSize                          N] - use send transaction batches of size N; default 0, no jms transactions
+    [--parallelThreads                               N] - number of threads to run in parallel; default 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
new file mode 100644
index 0000000..3cf5550
--- /dev/null
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
@@ -0,0 +1,16 @@
+Usage: producer [OPTIONS]
+Description: Demo producer that can be used to send messages to the broker
+Options :
+    [--brokerUrl                                   URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
+    [--user                                         ..] - connection user name
+    [--password                                     ..] - connection password
+    [--destination               queue://..|topic://..] - ; default TEST
+    [--persistent                           true|false] - use persistent or non persistent messages; default true
+    [--messageCount                                  N] - number of messages to send; default 1000
+    [--sleep                                         N] - millisecond sleep period between sends or receives; default 0
+    [--transactionBatchSize                          N] - use send transaction batches of size N; default 0, no jms transactions
+    [--parallelThreads                               N] - number of threads to run in parallel; default 1
+    [--msgTTL                                        N] - message TTL in milliseconds
+    [--messageSize                                   N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
+    [--textMessageSize                               N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
+    [--msgGroupID                                   ..] - JMS message group identifier
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
index bfff0fd..6494efe 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
@@ -123,7 +123,7 @@ public class AMQ3120Test {
         ProducerThread producer = new ProducerThread(producerSess, destination) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setSleep(650);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
index 8e6a96f..e965731 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
@@ -127,7 +127,7 @@ public class AMQ4323Test {
         ProducerThread producer = new ProducerThread(producerSess, destination) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setMessageCount(messageCount);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index 49026bd..c481172 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -105,7 +105,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer = new ProducerThread(sess, queue) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                BytesMessage bytesMessage = sess.createBytesMessage();
+                BytesMessage bytesMessage = session.createBytesMessage();
                 bytesMessage.writeBytes(payload);
                 return bytesMessage;
             }
@@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer.setMessageCount(1000);
@@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport {
         final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
             @Override
             protected Message createMessage(int i) throws Exception {
-                return sess.createTextMessage(payload + "::" + i);
+                return session.createTextMessage(payload + "::" + i);
             }
         };
         producer2.setMessageCount(1000);

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
deleted file mode 100644
index 6b4bad2..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
-public class ConsumerThread extends Thread {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
-
-    int messageCount = 1000;
-    int received = 0;
-    Destination dest;
-    Session sess;
-    boolean breakOnNull = true;
-
-    public ConsumerThread(Session sess, Destination dest) {
-        this.dest = dest;
-        this.sess = sess;
-    }
-
-    @Override
-    public void run() {
-      MessageConsumer consumer = null;
-
-        try {
-            consumer = sess.createConsumer(dest);
-            while (received < messageCount) {
-                Message msg = consumer.receive(3000);
-                if (msg != null) {
-                    LOG.info("Received " + received + ": " + ((TextMessage)msg).getText());
-                    received++;
-                } else {
-                    if (breakOnNull) {
-                        break;
-                    }
-                }
-            }
-        } catch (JMSException e) {
-            e.printStackTrace();
-        } finally {
-            if (consumer != null) {
-                try {
-                    consumer.close();
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    public int getReceived() {
-        return received;
-    }
-
-    public void setMessageCount(int messageCount) {
-        this.messageCount = messageCount;
-    }
-
-    public void setBreakOnNull(boolean breakOnNull) {
-        this.breakOnNull = breakOnNull;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/9f0ab46e/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
deleted file mode 100644
index c7cf90d..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ProducerThread.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
-public class ProducerThread extends Thread {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
-
-    int messageCount = 1000;
-    Destination dest;
-    protected Session sess;
-    int sleep = 0;
-    int sentCount = 0;
-
-    public ProducerThread(Session sess, Destination dest) {
-        this.dest = dest;
-        this.sess = sess;
-    }
-
-    public void run() {
-        MessageProducer producer = null;
-        try {
-            producer = sess.createProducer(dest);
-            for (sentCount = 0; sentCount < messageCount; sentCount++) {
-                producer.send(createMessage(sentCount));
-                LOG.info("Sent 'test message: " + sentCount + "'");
-                if (sleep > 0) {
-                    Thread.sleep(sleep);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            if (producer != null) {
-                try {
-                    producer.close();
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    protected Message createMessage(int i) throws Exception {
-        return sess.createTextMessage("test message: " + i);
-    }
-
-    public void setMessageCount(int messageCount) {
-        this.messageCount = messageCount;
-    }
-
-    public void setSleep(int sleep) {
-        this.sleep = sleep;
-    }
-
-    public int getMessageCount() {
-        return messageCount;
-    }
-
-    public int getSentCount() {
-        return sentCount;
-    }
-}


[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5564

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5564

Fixed session in the pool losing their reference to the anonymous
producer created when useAnonymousProducers is true.  The anonymous
producer stays live for the life of the pooled session.

Also added some synchronization safety to some methods that could get
into NPE trouble.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f91abd3d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f91abd3d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f91abd3d

Branch: refs/heads/master
Commit: f91abd3d46ac1f7ded399ce3bbc36fdfb70a91cb
Parents: 9f0ab46
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Feb 5 17:50:43 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Feb 5 17:50:43 2015 -0500

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       | 25 +++--
 .../activemq/jms/pool/PooledConnection.java     |  6 +-
 .../apache/activemq/jms/pool/PooledSession.java | 81 ++++++----------
 .../apache/activemq/jms/pool/SessionHolder.java | 98 ++++++++++++++++++++
 .../activemq/jms/pool/PooledSessionTest.java    | 53 +++++++++--
 5 files changed, 193 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index c802a17..6c3fdc3 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -21,8 +21,13 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -51,7 +56,7 @@ public class ConnectionPool implements ExceptionListener {
     private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
+    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
     private boolean reconnectOnException;
     private ExceptionListener parentExceptionListener;
@@ -61,29 +66,29 @@ public class ConnectionPool implements ExceptionListener {
         this.connection = wrap(connection);
 
         // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
-            new KeyedPoolableObjectFactory<SessionKey, Session>() {
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
+            new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
 
                 @Override
-                public void activateObject(SessionKey key, Session session) throws Exception {
+                public void activateObject(SessionKey key, SessionHolder session) throws Exception {
                 }
 
                 @Override
-                public void destroyObject(SessionKey key, Session session) throws Exception {
+                public void destroyObject(SessionKey key, SessionHolder session) throws Exception {
                     session.close();
                 }
 
                 @Override
-                public Session makeObject(SessionKey key) throws Exception {
-                    return makeSession(key);
+                public SessionHolder makeObject(SessionKey key) throws Exception {
+                    return new SessionHolder(makeSession(key));
                 }
 
                 @Override
-                public void passivateObject(SessionKey key, Session session) throws Exception {
+                public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
                 }
 
                 @Override
-                public boolean validateObject(SessionKey key, Session session) {
+                public boolean validateObject(SessionKey key, SessionHolder session) {
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index b268862..b7b56ba 100755
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.QueueConnection;
@@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
-import javax.jms.IllegalStateException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
 
     @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        PooledSession result;
-        result = (PooledSession) pool.createSession(transacted, ackMode);
+        PooledSession result = (PooledSession) pool.createSession(transacted, ackMode);
 
         // Store the session so we can close the sessions that this PooledConnection
         // created in order to ensure that consumers etc are closed per the JMS contract.

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index 3a2e698..cbfec29 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
     private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, Session> sessionPool;
+    private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
     private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
     private final AtomicBoolean closed = new AtomicBoolean();
 
-    private MessageProducer producer;
-    private TopicPublisher publisher;
-    private QueueSender sender;
-
-    private Session session;
+    private SessionHolder sessionHolder;
     private boolean transactional = true;
     private boolean ignoreClose;
     private boolean isXa;
     private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
+    public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
-        this.session = session;
+        this.sessionHolder = sessionHolder;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
         this.useAnonymousProducers = anonymous;
@@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
             if (invalidate) {
                 // lets close the session and not put the session back into the pool
                 // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
+                if (sessionHolder != null) {
                     try {
-                        session.close();
+                        sessionHolder.close();
                     } catch (JMSException e1) {
                         LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
                     }
                 }
                 try {
-                    sessionPool.invalidateObject(key, session);
+                    sessionPool.invalidateObject(key, sessionHolder);
                 } catch (Exception e) {
                     LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
                 }
             } else {
                 try {
-                    sessionPool.returnObject(key, session);
+                    sessionPool.returnObject(key, sessionHolder);
                 } catch (Exception e) {
                     javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
                     illegalStateException.initCause(e);
@@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
                 }
             }
 
-            session = null;
+            sessionHolder = null;
         }
     }
 
@@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public XAResource getXAResource() {
-        if (session instanceof XASession) {
-            return ((XASession) session).getXAResource();
+        SessionHolder session = safeGetSessionHolder();
+
+        if (session.getSession() instanceof XASession) {
+            return ((XASession) session.getSession()).getXAResource();
         }
+
         return null;
     }
 
@@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public void run() {
+        SessionHolder session = safeGetSessionHolder();
         if (session != null) {
-            session.run();
+            session.getSession().run();
         }
     }
 
@@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     }
 
     public Session getInternalSession() throws IllegalStateException {
-        if (session == null) {
-            throw new IllegalStateException("The session has already been closed");
-        }
-        return session;
+        return safeGetSessionHolder().getSession();
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
@@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         MessageProducer result = null;
 
         if (useAnonymousProducers) {
-            if (producer == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (producer == null) {
-                        producer = getInternalSession().createProducer(null);
-                    }
-                }
-            }
-
-            result = producer;
+            result = safeGetSessionHolder().getOrCreateProducer();
         } else {
             result = getInternalSession().createProducer(destination);
         }
@@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         QueueSender result = null;
 
         if (useAnonymousProducers) {
-            if (sender == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (sender == null) {
-                        sender = ((QueueSession) getInternalSession()).createSender(null);
-                    }
-                }
-            }
-
-            result = sender;
+            result = safeGetSessionHolder().getOrCreateSender();
         } else {
             result = ((QueueSession) getInternalSession()).createSender(destination);
         }
@@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
         TopicPublisher result = null;
 
         if (useAnonymousProducers) {
-            if (publisher == null) {
-                // Don't allow for duplicate anonymous producers.
-                synchronized (this) {
-                    if (publisher == null) {
-                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
-                    }
-                }
-            }
-
-            result = publisher;
+            result = safeGetSessionHolder().getOrCreatePublisher();
         } else {
             result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
@@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public String toString() {
-        return "PooledSession { " + session + " }";
+        return "PooledSession { " + safeGetSessionHolder() + " }";
     }
 
     /**
@@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     protected void onConsumerClose(MessageConsumer consumer) {
         consumers.remove(consumer);
     }
+
+    private SessionHolder safeGetSessionHolder() {
+        SessionHolder sessionHolder = this.sessionHolder;
+        if (sessionHolder == null) {
+            throw new IllegalStateException("The session has already been closed");
+        }
+
+        return sessionHolder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
new file mode 100644
index 0000000..afa75d6
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+/**
+ * Used to store a pooled session instance and any resources that can
+ * be left open and carried along with the pooled instance such as the
+ * anonymous producer used for all MessageProducer instances created
+ * from this pooled session when enabled.
+ */
+public class SessionHolder {
+
+    private final Session session;
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
+    public SessionHolder(Session session) {
+        this.session = session;
+    }
+
+    public void close() throws JMSException {
+        try {
+            session.close();
+        } finally {
+            producer = null;
+            publisher = null;
+            sender = null;
+        }
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    public MessageProducer getOrCreateProducer() throws JMSException {
+        if (producer == null) {
+            synchronized (this) {
+                if (producer == null) {
+                    producer = session.createProducer(null);
+                }
+            }
+        }
+
+        return producer;
+    }
+
+    public TopicPublisher getOrCreatePublisher() throws JMSException {
+        if (publisher == null) {
+            synchronized (this) {
+                if (publisher == null) {
+                    publisher = ((TopicSession) session).createPublisher(null);
+                }
+            }
+        }
+
+        return publisher;
+    }
+
+    public QueueSender getOrCreateSender() throws JMSException {
+        if (sender == null) {
+            synchronized (this) {
+                if (sender == null) {
+                    sender = ((QueueSession) session).createSender(null);
+                }
+            }
+        }
+
+        return sender;
+    }
+
+    @Override
+    public String toString() {
+        return session.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f91abd3d/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
index 7483e6b..9432add 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -17,9 +17,13 @@
 package org.apache.activemq.jms.pool;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueSession;
 import javax.jms.Session;
@@ -44,7 +48,8 @@ public class PooledSessionTest {
     public void setUp() throws Exception {
         broker = new BrokerService();
         broker.setPersistent(false);
-        broker.setUseJmx(false);
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateMBeanServer(false);
         TransportConnector connector = broker.addConnector("tcp://localhost:0");
         broker.start();
         connectionUri = connector.getPublishableConnectString();
@@ -62,7 +67,7 @@ public class PooledSessionTest {
         broker = null;
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testPooledSessionStats() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
 
@@ -73,9 +78,11 @@ public class PooledSessionTest {
         assertEquals(0, connection.getNumActiveSessions());
         assertEquals(1, connection.getNumtIdleSessions());
         assertEquals(1, connection.getNumSessions());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testMessageProducersAreAllTheSame() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -87,9 +94,11 @@ public class PooledSessionTest {
         PooledProducer producer2 = (PooledProducer) session.createProducer(queue2);
 
         assertSame(producer1.getMessageProducer(), producer2.getMessageProducer());
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testThrowsWhenDifferentDestinationGiven() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -110,9 +119,11 @@ public class PooledSessionTest {
             fail("Should only be able to send to queue 1");
         } catch (Exception ex) {
         }
+
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testCreateTopicPublisher() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -124,9 +135,10 @@ public class PooledSessionTest {
         PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2);
 
         assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
+        connection.close();
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testQueueSender() throws Exception {
         PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
         QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -138,5 +150,34 @@ public class PooledSessionTest {
         PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2);
 
         assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRepeatedCreateSessionProducerResultsInSame() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+
+        assertTrue(pooledFactory.isUseAnonymousProducers());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createTopic("test-topic");
+        PooledProducer producer = (PooledProducer) session.createProducer(destination);
+        MessageProducer original = producer.getMessageProducer();
+        assertNotNull(original);
+        session.close();
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        for (int i = 0; i < 20; ++i) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = (PooledProducer) session.createProducer(destination);
+            assertSame(original, producer.getMessageProducer());
+            session.close();
+        }
+
+        assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length);
+
+        connection.close();
+        pooledFactory.clear();
     }
 }


[3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5567 - fix and test - issue was eager dispatch with concurrent sends, recovery and outcome delivery now assign a new sequence id to message additions which sorts sequence order in the x

Posted by ha...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5567 - fix and test - issue was eager dispatch with concurrent sends, recovery and outcome delivery now assign a new sequence id to message additions which sorts sequence order in the xa case


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3155c625
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3155c625
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3155c625

Branch: refs/heads/master
Commit: 3155c625c69e3251e3ecd73d50f1dc5ed48387d3
Parents: f91abd3
Author: gtully <ga...@gmail.com>
Authored: Fri Feb 6 12:55:43 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Fri Feb 6 12:56:29 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/AbstractMessageStore.java    |   4 +
 .../apache/activemq/store/jdbc/JDBCAdapter.java |   2 +-
 .../activemq/store/jdbc/JDBCMessageStore.java   |  68 +++---
 .../store/jdbc/JDBCPersistenceAdapter.java      |   6 +-
 .../store/jdbc/JDBCTopicMessageStore.java       |   2 +-
 .../store/jdbc/JdbcMemoryTransactionStore.java  | 111 ++++-----
 .../apache/activemq/store/jdbc/Statements.java  |   2 +-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |   3 +-
 activemq-unit-tests/pom.xml                     |   2 +-
 .../store/RecoverExpiredMessagesTest.java       |  12 +-
 .../org/apache/activemq/bugs/AMQ5567Test.java   | 228 +++++++++++++++++++
 11 files changed, 339 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index 43713e6..faa6c1f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -125,6 +125,10 @@ abstract public class AbstractMessageStore implements MessageStore {
         this.indexListener = indexListener;
     }
 
+    public IndexListener getIndexListener() {
+        return indexListener;
+    }
+
     static {
        FUTURE = new InlineListenableFuture();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
index 1d368b6..df4fcf3 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
@@ -105,7 +105,7 @@ public interface JDBCAdapter {
 
     void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException;
 
-    void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException;
+    void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException;
 
     void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 75a68c7..923f3f1 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -30,6 +30,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -70,7 +71,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
     protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
-    final Set<Long> recoveredAdditions = new LinkedHashSet<Long>();
     protected ActiveMQMessageAudit audit;
     protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
     
@@ -112,6 +112,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
             return;
         }
 
+        // if xaXid present - this is a prepare - so we don't yet have an outcome
+        final XATransactionId xaXid =  context != null ? context.getXid() : null;
+
         // Serialize the Message..
         byte data[];
         try {
@@ -127,38 +130,44 @@ public class JDBCMessageStore extends AbstractMessageStore {
         synchronized (pendingAdditions) {
             sequenceId = persistenceAdapter.getNextSequenceId();
             final long sequence = sequenceId;
-            pendingAdditions.add(sequence);
-            c.onCompletion(new Runnable() {
-                public void run() {
-                    // jdbc close or jms commit - while futureOrSequenceLong==null ordered
-                    // work will remain pending on the Queue
-                    message.getMessageId().setFutureOrSequenceLong(sequence);
-                    message.getMessageId().setEntryLocator(sequence);
-                }
-            });
+            message.getMessageId().setEntryLocator(sequence);
+
+            if (xaXid == null) {
+                pendingAdditions.add(sequence);
 
-            if (indexListener != null) {
-                indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
-                    @Override
+                c.onCompletion(new Runnable() {
                     public void run() {
-                        // cursor add complete
-                        synchronized (pendingAdditions) { pendingAdditions.remove(sequence); }
+                        // jdbc close or jms commit - while futureOrSequenceLong==null ordered
+                        // work will remain pending on the Queue
+                        message.getMessageId().setFutureOrSequenceLong(sequence);
                     }
-                }));
-            } else {
-                pendingAdditions.remove(sequence);
+                });
+
+                if (indexListener != null) {
+                    indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() {
+                        @Override
+                        public void run() {
+                            // cursor add complete
+                            synchronized (pendingAdditions) { pendingAdditions.remove(sequence); }
+                        }
+                    }));
+                } else {
+                    pendingAdditions.remove(sequence);
+                }
             }
         }
         try {
             adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
-                    this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null);
+                    this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }
-        onAdd(message, sequenceId, message.getPriority());
+        if (xaXid == null) {
+            onAdd(message, sequenceId, message.getPriority());
+        }
     }
 
     // jdbc commit order is random with concurrent connections - limit scan to lowest pending
@@ -186,12 +195,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
-    protected void onAdd(Message message, long sequenceId, byte priority) {
-        if (message.getTransactionId() != null && message.getTransactionId().isXATransaction()
-                && lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) {
-            recoveredAdditions.add(sequenceId);
-        }
-    }
+    protected void onAdd(Message message, long sequenceId, byte priority) {}
 
     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
         // Get a connection and insert the message into the DB.
@@ -329,18 +333,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            if (!recoveredAdditions.isEmpty()) {
-                for (Iterator<Long> iterator = recoveredAdditions.iterator(); iterator.hasNext(); )  {
-                    Long sequenceId = iterator.next();
-                    iterator.remove();
-                    maxReturned--;
-                    if (sequenceId <= lastRecoveredSequenceId.get()) {
-                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(adapter.doGetMessageById(c, sequenceId)));
-                        LOG.trace("recovered add {} {}", this, msg.getMessageId());
-                        listener.recoverMessage(msg);
-                    }
-                }
-            }
             if (LOG.isTraceEnabled()) {
                 LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index e335926..4236e9d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -766,11 +766,11 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         }
     }
 
-    public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
+    public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
         TransactionContext c = getTransactionContext(context);
         try {
-            long sequence = (Long)messageId.getFutureOrSequenceLong();
-            getAdapter().doCommitAddOp(c, sequence);
+            long sequence = (Long)messageId.getEntryLocator();
+            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 1841f11..a0cb133 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -316,6 +316,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
     }
 
+    @Override
     protected void onAdd(Message message, long sequenceId, byte priority) {
         // update last recovered state
         for (LastRecovered last : subscriberLastRecoveredMap.values()) {
@@ -329,7 +330,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
         }
     }
 
-
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index b2fedf7..7f42c7f 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -27,6 +27,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
@@ -90,41 +91,68 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
         ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
         for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
             final AddMessageCommand addMessageCommand = iter.next();
-            updateFromPreparedStateCommands.add(new AddMessageCommand() {
-                @Override
-                public Message getMessage() {
-                    return addMessageCommand.getMessage();
-                }
+            updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand));
+        }
+        tx.messages = updateFromPreparedStateCommands;
+        preparedTransactions.put(txid, tx);
 
-                @Override
-                public MessageStore getMessageStore() {
-                    return addMessageCommand.getMessageStore();
-                }
+    }
 
-                @Override
-                public void run(ConnectionContext context) throws IOException {
-                    JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
-                    Message message = addMessageCommand.getMessage();
-                    jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
-                    ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
-                            message,
-                            (Long)message.getMessageId().getFutureOrSequenceLong(),
-                            message.getPriority());
 
-                }
+    class CommitAddOutcome implements AddMessageCommand {
+        final Message message;
+        JDBCMessageStore jdbcMessageStore;
 
-                @Override
-                public void setMessageStore(MessageStore messageStore) {
-                    throw new RuntimeException("MessageStore already known");
+        public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) {
+            this.jdbcMessageStore = jdbcMessageStore;
+            this.message = message;
+        }
+
+        public CommitAddOutcome(AddMessageCommand addMessageCommand) {
+            this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage());
+        }
+
+        @Override
+        public Message getMessage() {
+            return message;
+        }
+
+        @Override
+        public MessageStore getMessageStore() {
+            return jdbcMessageStore;
+        }
+
+        @Override
+        public void run(final ConnectionContext context) throws IOException {
+            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
+            final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
+            TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
+
+            synchronized (jdbcMessageStore.pendingAdditions) {
+                message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
+
+                c.onCompletion(new Runnable() {
+                    @Override
+                    public void run() {
+                        message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
+                    }
+                });
+
+                if (jdbcMessageStore.getIndexListener() != null) {
+                    jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null));
                 }
-            });
+            }
+
+            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence);
+            jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
         }
-        tx.messages = updateFromPreparedStateCommands;
-        preparedTransactions.put(txid, tx);
 
+        @Override
+        public void setMessageStore(MessageStore messageStore) {
+            jdbcMessageStore = (JDBCMessageStore) messageStore;
+        }
     }
 
-
     @Override
     public void rollback(TransactionId txid) throws IOException {
 
@@ -148,9 +176,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
                         if (removeMessageCommand instanceof LastAckCommand ) {
                             ((LastAckCommand)removeMessageCommand).rollback(ctx);
                         } else {
+                            MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
                             // need to unset the txid flag on the existing row
-                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
-                                    removeMessageCommand.getMessageAck().getLastMessageId());
+                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
                         }
                     }
                 } catch (IOException e) {
@@ -171,36 +199,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
     public void recoverAdd(long id, byte[] messageBytes) throws IOException {
         final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
         message.getMessageId().setFutureOrSequenceLong(id);
+        message.getMessageId().setEntryLocator(id);
         Tx tx = getPreparedTx(message.getTransactionId());
-        tx.add(new AddMessageCommand() {
-            MessageStore messageStore;
-            @Override
-            public Message getMessage() {
-                return message;
-            }
-
-            @Override
-            public MessageStore getMessageStore() {
-                return messageStore;
-            }
-
-            @Override
-            public void run(ConnectionContext context) throws IOException {
-                ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
-                ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getFutureOrSequenceLong()).longValue(), message.getPriority());
-            }
-
-            @Override
-            public void setMessageStore(MessageStore messageStore) {
-                this.messageStore = messageStore;
-            }
-
-        });
+        tx.add(new CommitAddOutcome(null, message));
     }
 
     public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
         Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
         msg.getMessageId().setFutureOrSequenceLong(id);
+        msg.getMessageId().setEntryLocator(id);
         Tx tx = getPreparedTx(new XATransactionId(xid));
         final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
         tx.add(new RemoveMessageCommand() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index a595f33..8ee3123 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -617,7 +617,7 @@ public class Statements {
     public String getClearXidFlagStatement() {
         if (clearXidFlagStatement == null) {
             clearXidFlagStatement = "UPDATE "  + getFullMessageTableName()
-                    + " SET XID = NULL WHERE ID = ?";
+                    + " SET XID = NULL, ID = ? WHERE ID = ?";
         }
         return clearXidFlagStatement;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 970e0f8..a94abdf 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -1048,12 +1048,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
     }
 
     @Override
-    public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
+    public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
         PreparedStatement s = null;
         cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
             s.setLong(1, sequence);
+            s.setLong(2, preparedSequence);
             if (s.executeUpdate() != 1) {
                 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 94ebc99..c7757b8 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -443,7 +443,7 @@
           </excludes>
           <systemPropertyVariables>
             <java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
-            <org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>
+            <org.apache.activemq.default.directory.prefix>target/</org.apache.activemq.default.directory.prefix>
             <org.apache.activemq.AutoFailTestSupport.disableSystemExit>true</org.apache.activemq.AutoFailTestSupport.disableSystemExit>
           </systemPropertyVariables>
           <includes>

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
index 38e8ee7..fb0296c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.store;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import junit.framework.Test;
@@ -53,9 +54,14 @@ public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport {
 
     public void initCombosForTestRecovery() throws Exception {
         addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[] {new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()});
-        addCombinationValues("persistenceAdapter", new PersistenceAdapter[] {new KahaDBPersistenceAdapter(),
-                // need to supply the dataSource as it is used in parameter matching via the toString
-                new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())});
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[] {
+                new KahaDBPersistenceAdapter(),
+                new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
     }
 
     public void testRecovery() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3155c625/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
new file mode 100755
index 0000000..a8739ae
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerRestartTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5567Test extends BrokerRestartTestSupport {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class);
+    ActiveMQQueue destination = new ActiveMQQueue("Q");
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(60*1024);
+        return policy;
+    }
+
+    public void initCombosForTestPreparedTransactionNotDispatched() throws Exception {
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{
+                new KahaDBPersistenceAdapter(),
+                new LevelDBPersistenceAdapter(),
+                new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
+    }
+
+    public void testPreparedTransactionNotDispatched() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("Q");
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        message.setTransactionId(txid);
+        connection.send(message);
+
+        connection.send(createPrepareTransaction(connectionInfo, txid));
+
+
+        // send another non tx, will poke dispatch
+        message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.send(message);
+
+
+        // Since prepared but not committed.. only one should get delivered
+        StubConnection connectionC = createConnection();
+        ConnectionInfo connectionInfoC = createConnectionInfo();
+        SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
+        connectionC.send(connectionInfoC);
+        connectionC.send(sessionInfoC);
+        connectionC.send(consumerInfo);
+
+        Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+        LOG.info("received: " + m);
+        assertNotNull("Got message", m);
+        assertNull("Got non tx message", m.getTransactionId());
+
+        // cannot get the prepared message till commit
+        assertNull(receiveMessage(connectionC));
+        assertNoMessagesLeft(connectionC);
+
+        LOG.info("commit: " + txid);
+        connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+
+        m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+        LOG.info("received: " + m);
+        assertNotNull("Got non null message", m);
+
+    }
+
+    public void initCombosForTestCursorStoreSync() throws Exception {
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{
+                new KahaDBPersistenceAdapter(),
+                new LevelDBPersistenceAdapter(),
+                new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
+    }
+
+    public void testCursorStoreSync() throws Exception {
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        message.setTransactionId(txid);
+        connection.request(message);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        assertTrue("cache is enabled", proxy.isCacheEnabled());
+
+        // send another non tx, will fill cursor
+        String payload = new String(new byte[10*1024]);
+        for (int i=0; i<6; i++) {
+            message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            ((TextMessage)message).setText(payload);
+            connection.request(message);
+        }
+
+        assertTrue("cache is disabled", !proxy.isCacheEnabled());
+
+        StubConnection connectionC = createConnection();
+        ConnectionInfo connectionInfoC = createConnectionInfo();
+        SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination);
+        connectionC.send(connectionInfoC);
+        connectionC.send(sessionInfoC);
+        connectionC.send(consumerInfo);
+
+        Message m = null;
+        for (int i=0; i<3; i++) {
+            m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("Got message", m);
+            assertNull("Got non tx message", m.getTransactionId());
+            connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        }
+
+        LOG.info("commit: " + txid);
+        connection.request(createCommitTransaction2Phase(connectionInfo, txid));
+        // consume the rest including the 2pc send in TX
+
+        for (int i=0; i<4; i++) {
+            m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received[" + i + "] " + m);
+            assertNotNull("Got message", m);
+            if (i==3 ) {
+                assertNotNull("Got  tx message", m.getTransactionId());
+            } else {
+                assertNull("Got non tx message", m.getTransactionId());
+            }
+            connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        }
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean()
+            throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":destinationType=Queue,destinationName=" + destination.getQueueName()
+                + ",type=Broker,brokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName,
+                        QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    public static Test suite() {
+        return suite(AMQ5567Test.class);
+    }
+
+}