You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/03/24 13:18:14 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5558 - support durable consumers and ack modes for consumers

Repository: activemq
Updated Branches:
  refs/heads/master 3051882f9 -> ebb3df768


https://issues.apache.org/jira/browse/AMQ-5558 - support durable consumers and ack modes for consumers


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebb3df76
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebb3df76
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebb3df76

Branch: refs/heads/master
Commit: ebb3df7681dcf4a17ed0036ec14cab22d2ab3bdf
Parents: 3051882
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue Mar 24 13:17:50 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue Mar 24 13:18:10 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/util/ConsumerThread.java    | 39 +++++++++---
 .../apache/activemq/util/ProducerThread.java    | 14 ++---
 .../console/command/ConsumerCommand.java        | 65 +++++++++++++++++---
 .../console/command/ProducerCommand.java        | 12 ++--
 .../activemq/console/command/consumer.txt       |  5 +-
 .../activemq/console/command/producer.txt       |  2 +-
 6 files changed, 103 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
index 86bcadb..18dd20a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
@@ -30,9 +30,10 @@ public class ConsumerThread extends Thread {
     int receiveTimeOut = 3000;
     Destination destination;
     Session session;
+    boolean durable;
     boolean breakOnNull = true;
     int sleep;
-    int transactionBatchSize;
+    int batchSize;
 
     int received = 0;
     int transactions = 0;
@@ -52,7 +53,11 @@ public class ConsumerThread extends Thread {
         String threadName = Thread.currentThread().getName();
         LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
         try {
-            consumer = session.createConsumer(destination);
+            if (durable && destination instanceof Topic) {
+                consumer = session.createDurableSubscriber((Topic) destination, getName());
+            } else {
+                consumer = session.createConsumer(destination);
+            }
             while (running && received < messageCount) {
                 Message msg = consumer.receive(receiveTimeOut);
                 if (msg != null) {
@@ -70,11 +75,17 @@ public class ConsumerThread extends Thread {
                     }
                 }
 
-                if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
-                    LOG.info(threadName + " Committing transaction: " + transactions++);
-                    session.commit();
+                if (session.getTransacted()) {
+                    if (batchSize > 0 && received > 0 && received % batchSize == 0) {
+                        LOG.info(threadName + " Committing transaction: " + transactions++);
+                        session.commit();
+                    }
+                } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+                    if (batchSize > 0 && received > 0 && received % batchSize == 0) {
+                        LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received);
+                        msg.acknowledge();
+                    }
                 }
-
                 if (sleep > 0) {
                     Thread.sleep(sleep);
                 }
@@ -103,6 +114,14 @@ public class ConsumerThread extends Thread {
         return received;
     }
 
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
+
     public void setMessageCount(int messageCount) {
         this.messageCount = messageCount;
     }
@@ -111,12 +130,12 @@ public class ConsumerThread extends Thread {
         this.breakOnNull = breakOnNull;
     }
 
-    public int getTransactionBatchSize() {
-        return transactionBatchSize;
+    public int getBatchSize() {
+        return batchSize;
     }
 
-    public void setTransactionBatchSize(int transactionBatchSize) {
-        this.transactionBatchSize = transactionBatchSize;
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
     }
 
     public int getMessageCount() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
index 638f60b..ffaa735 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -43,7 +43,7 @@ public class ProducerThread extends Thread {
     int sentCount = 0;
     String message;
     String messageText = null;
-    String url = null;
+    String payloadUrl = null;
     byte[] payload = null;
     boolean running = false;
     CountDownLatch finished;
@@ -123,8 +123,8 @@ public class ProducerThread extends Thread {
                 if (messageText == null) {
                     messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
                 }
-            } else if (url != null) {
-                messageText = readInputStream(new URL(url).openStream(), -1, i);
+            } else if (payloadUrl != null) {
+                messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
             } else if (message != null) {
                 messageText = message;
             } else {
@@ -249,12 +249,12 @@ public class ProducerThread extends Thread {
         this.finished = finished;
     }
 
-    public String getUrl() {
-        return url;
+    public String getPayloadUrl() {
+        return payloadUrl;
     }
 
-    public void setUrl(String url) {
-        this.url = url;
+    public void setPayloadUrl(String payloadUrl) {
+        this.payloadUrl = payloadUrl;
     }
 
     public String getMessage() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
index 58f37b8..9439f95 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
@@ -19,7 +19,6 @@ package org.apache.activemq.console.command;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +36,11 @@ public class ConsumerCommand extends AbstractCommand {
     String destination = "queue://TEST";
     int messageCount = 1000;
     int sleep;
-    int transactionBatchSize;
+    boolean transacted;
+    private boolean durable;
+    private String clientId;
+    int batchSize = 10;
+    int ackMode = Session.AUTO_ACKNOWLEDGE;
     int parallelThreads = 1;
     boolean bytesAsText;
 
@@ -52,13 +55,16 @@ public class ConsumerCommand extends AbstractCommand {
         Connection conn = null;
         try {
             conn = factory.createConnection(user, password);
+            if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
+                conn.setClientID(clientId);
+            }
             conn.start();
 
             Session sess;
-            if (transactionBatchSize != 0) {
+            if (transacted) {
                 sess = conn.createSession(true, Session.SESSION_TRANSACTED);
             } else {
-                sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                sess = conn.createSession(false, ackMode);
             }
 
 
@@ -67,10 +73,11 @@ public class ConsumerCommand extends AbstractCommand {
             for (int i = 1; i <= parallelThreads; i++) {
                 ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
                 consumer.setName("consumer-" + i);
+                consumer.setDurable(durable);
                 consumer.setBreakOnNull(false);
                 consumer.setMessageCount(messageCount);
                 consumer.setSleep(sleep);
-                consumer.setTransactionBatchSize(transactionBatchSize);
+                consumer.setBatchSize(batchSize);
                 consumer.setFinished(active);
                 consumer.setBytesAsText(bytesAsText);
                 consumer.start();
@@ -132,12 +139,12 @@ public class ConsumerCommand extends AbstractCommand {
         this.sleep = sleep;
     }
 
-    public int getTransactionBatchSize() {
-        return transactionBatchSize;
+    public int getBatchSize() {
+        return batchSize;
     }
 
-    public void setTransactionBatchSize(int transactionBatchSize) {
-        this.transactionBatchSize = transactionBatchSize;
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
     }
 
     public int getParallelThreads() {
@@ -156,6 +163,46 @@ public class ConsumerCommand extends AbstractCommand {
         this.bytesAsText = bytesAsText;
     }
 
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    public int getAckMode() {
+        return ackMode;
+    }
+
+    public void setAckMode(String ackMode) {
+        if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
+            this.ackMode = Session.CLIENT_ACKNOWLEDGE;
+        }
+        if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
+            this.ackMode = Session.AUTO_ACKNOWLEDGE;
+        }
+        if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
+            this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+        }
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
     @Override
     protected void printHelp() {
         printHelpFromFile();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
index e4c53fe..8138a2c 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
@@ -38,7 +38,7 @@ public class ProducerCommand extends AbstractCommand {
     int sleep = 0;
     boolean persistent = true;
     String message = null;
-    String url = null;
+    String payloadUrl = null;
     int messageSize = 0;
     int textMessageSize;
     long msgTTL = 0L;
@@ -78,7 +78,7 @@ public class ProducerCommand extends AbstractCommand {
                 producer.setPersistent(persistent);
                 producer.setTransactionBatchSize(transactionBatchSize);
                 producer.setMessage(message);
-                producer.setUrl(url);
+                producer.setPayloadUrl(payloadUrl);
                 producer.setMessageSize(messageSize);
                 producer.setMsgGroupID(msgGroupID);
                 producer.setTextMessageSize(textMessageSize);
@@ -198,12 +198,12 @@ public class ProducerCommand extends AbstractCommand {
         this.parallelThreads = parallelThreads;
     }
 
-    public String getUrl() {
-        return url;
+    public String getPayloadUrl() {
+        return payloadUrl;
     }
 
-    public void setUrl(String url) {
-        this.url = url;
+    public void setPayloadUrl(String payloadUrl) {
+        this.payloadUrl = payloadUrl;
     }
 
     public String getMessage() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
index 986a771..9cf7c55 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
@@ -7,6 +7,9 @@ Options :
     [--destination               queue://..|topic://..] - consumer destination; default queue://TEST
     [--messageCount                                  N] - number of messages to send; default 1000
     [--sleep                                         N] - millisecond sleep period between sends or receives; default 0
-    [--transactionBatchSize                          N] - use send transaction batches of size N; default 0, no jms transactions
+    [--ackMode     AUTO_ACKNOWLEDGE|CLIENT_ACKNOWLEDGE] - the type of message acknowledgement to use; default auto acknowledge
+    [--batchSize                                     N] - batch size for transactions and client acknowledgment (default 10)
+    [--durable                              true|false] - create durable topic
+    [--clientId                                     ..] - connection client id; must be set for durable topics
     [--parallelThreads                               N] - number of threads to run in parallel; default 1
     [--bytesAsText                          true|false] - try to treat a BytesMessage as a text string
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
index 65f437a..1f37586 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
@@ -14,5 +14,5 @@ Options :
     [--messageSize                                   N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
     [--textMessageSize                               N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
     [--message                                      ..] - a text string to use as the message body
-    [--url                                         URL] - a url pointing to a document to use as the message body
+    [--payloadUrl                                  URL] - a url pointing to a document to use as the message body
     [--msgGroupID                                   ..] - JMS message group identifier
\ No newline at end of file