You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/03/24 13:18:14 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5558 -
support durable consumers and ack modes for consumers
Repository: activemq
Updated Branches:
refs/heads/master 3051882f9 -> ebb3df768
https://issues.apache.org/jira/browse/AMQ-5558 - support durable consumers and ack modes for consumers
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebb3df76
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebb3df76
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebb3df76
Branch: refs/heads/master
Commit: ebb3df7681dcf4a17ed0036ec14cab22d2ab3bdf
Parents: 3051882
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue Mar 24 13:17:50 2015 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue Mar 24 13:18:10 2015 +0100
----------------------------------------------------------------------
.../apache/activemq/util/ConsumerThread.java | 39 +++++++++---
.../apache/activemq/util/ProducerThread.java | 14 ++---
.../console/command/ConsumerCommand.java | 65 +++++++++++++++++---
.../console/command/ProducerCommand.java | 12 ++--
.../activemq/console/command/consumer.txt | 5 +-
.../activemq/console/command/producer.txt | 2 +-
6 files changed, 103 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
index 86bcadb..18dd20a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java
@@ -30,9 +30,10 @@ public class ConsumerThread extends Thread {
int receiveTimeOut = 3000;
Destination destination;
Session session;
+ boolean durable;
boolean breakOnNull = true;
int sleep;
- int transactionBatchSize;
+ int batchSize;
int received = 0;
int transactions = 0;
@@ -52,7 +53,11 @@ public class ConsumerThread extends Thread {
String threadName = Thread.currentThread().getName();
LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
try {
- consumer = session.createConsumer(destination);
+ if (durable && destination instanceof Topic) {
+ consumer = session.createDurableSubscriber((Topic) destination, getName());
+ } else {
+ consumer = session.createConsumer(destination);
+ }
while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
@@ -70,11 +75,17 @@ public class ConsumerThread extends Thread {
}
}
- if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
- LOG.info(threadName + " Committing transaction: " + transactions++);
- session.commit();
+ if (session.getTransacted()) {
+ if (batchSize > 0 && received > 0 && received % batchSize == 0) {
+ LOG.info(threadName + " Committing transaction: " + transactions++);
+ session.commit();
+ }
+ } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+ if (batchSize > 0 && received > 0 && received % batchSize == 0) {
+ LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received);
+ msg.acknowledge();
+ }
}
-
if (sleep > 0) {
Thread.sleep(sleep);
}
@@ -103,6 +114,14 @@ public class ConsumerThread extends Thread {
return received;
}
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
@@ -111,12 +130,12 @@ public class ConsumerThread extends Thread {
this.breakOnNull = breakOnNull;
}
- public int getTransactionBatchSize() {
- return transactionBatchSize;
+ public int getBatchSize() {
+ return batchSize;
}
- public void setTransactionBatchSize(int transactionBatchSize) {
- this.transactionBatchSize = transactionBatchSize;
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
}
public int getMessageCount() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
index 638f60b..ffaa735 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java
@@ -43,7 +43,7 @@ public class ProducerThread extends Thread {
int sentCount = 0;
String message;
String messageText = null;
- String url = null;
+ String payloadUrl = null;
byte[] payload = null;
boolean running = false;
CountDownLatch finished;
@@ -123,8 +123,8 @@ public class ProducerThread extends Thread {
if (messageText == null) {
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
}
- } else if (url != null) {
- messageText = readInputStream(new URL(url).openStream(), -1, i);
+ } else if (payloadUrl != null) {
+ messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
} else if (message != null) {
messageText = message;
} else {
@@ -249,12 +249,12 @@ public class ProducerThread extends Thread {
this.finished = finished;
}
- public String getUrl() {
- return url;
+ public String getPayloadUrl() {
+ return payloadUrl;
}
- public void setUrl(String url) {
- this.url = url;
+ public void setPayloadUrl(String payloadUrl) {
+ this.payloadUrl = payloadUrl;
}
public String getMessage() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
index 58f37b8..9439f95 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java
@@ -19,7 +19,6 @@ package org.apache.activemq.console.command;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +36,11 @@ public class ConsumerCommand extends AbstractCommand {
String destination = "queue://TEST";
int messageCount = 1000;
int sleep;
- int transactionBatchSize;
+ boolean transacted;
+ private boolean durable;
+ private String clientId;
+ int batchSize = 10;
+ int ackMode = Session.AUTO_ACKNOWLEDGE;
int parallelThreads = 1;
boolean bytesAsText;
@@ -52,13 +55,16 @@ public class ConsumerCommand extends AbstractCommand {
Connection conn = null;
try {
conn = factory.createConnection(user, password);
+ if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
+ conn.setClientID(clientId);
+ }
conn.start();
Session sess;
- if (transactionBatchSize != 0) {
+ if (transacted) {
sess = conn.createSession(true, Session.SESSION_TRANSACTED);
} else {
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sess = conn.createSession(false, ackMode);
}
@@ -67,10 +73,11 @@ public class ConsumerCommand extends AbstractCommand {
for (int i = 1; i <= parallelThreads; i++) {
ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
consumer.setName("consumer-" + i);
+ consumer.setDurable(durable);
consumer.setBreakOnNull(false);
consumer.setMessageCount(messageCount);
consumer.setSleep(sleep);
- consumer.setTransactionBatchSize(transactionBatchSize);
+ consumer.setBatchSize(batchSize);
consumer.setFinished(active);
consumer.setBytesAsText(bytesAsText);
consumer.start();
@@ -132,12 +139,12 @@ public class ConsumerCommand extends AbstractCommand {
this.sleep = sleep;
}
- public int getTransactionBatchSize() {
- return transactionBatchSize;
+ public int getBatchSize() {
+ return batchSize;
}
- public void setTransactionBatchSize(int transactionBatchSize) {
- this.transactionBatchSize = transactionBatchSize;
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
}
public int getParallelThreads() {
@@ -156,6 +163,46 @@ public class ConsumerCommand extends AbstractCommand {
this.bytesAsText = bytesAsText;
}
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ public int getAckMode() {
+ return ackMode;
+ }
+
+ public void setAckMode(String ackMode) {
+ if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.CLIENT_ACKNOWLEDGE;
+ }
+ if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+ }
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
@Override
protected void printHelp() {
printHelpFromFile();
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
index e4c53fe..8138a2c 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java
@@ -38,7 +38,7 @@ public class ProducerCommand extends AbstractCommand {
int sleep = 0;
boolean persistent = true;
String message = null;
- String url = null;
+ String payloadUrl = null;
int messageSize = 0;
int textMessageSize;
long msgTTL = 0L;
@@ -78,7 +78,7 @@ public class ProducerCommand extends AbstractCommand {
producer.setPersistent(persistent);
producer.setTransactionBatchSize(transactionBatchSize);
producer.setMessage(message);
- producer.setUrl(url);
+ producer.setPayloadUrl(payloadUrl);
producer.setMessageSize(messageSize);
producer.setMsgGroupID(msgGroupID);
producer.setTextMessageSize(textMessageSize);
@@ -198,12 +198,12 @@ public class ProducerCommand extends AbstractCommand {
this.parallelThreads = parallelThreads;
}
- public String getUrl() {
- return url;
+ public String getPayloadUrl() {
+ return payloadUrl;
}
- public void setUrl(String url) {
- this.url = url;
+ public void setPayloadUrl(String payloadUrl) {
+ this.payloadUrl = payloadUrl;
}
public String getMessage() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
index 986a771..9cf7c55 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/consumer.txt
@@ -7,6 +7,9 @@ Options :
[--destination queue://..|topic://..] - consumer destination; default queue://TEST
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
- [--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
+ [--ackMode AUTO_ACKNOWLEDGE|CLIENT_ACKNOWLEDGE] - the type of message acknowledgement to use; default auto acknowledge
+ [--batchSize N] - batch size for transactions and client acknowledgment (default 10)
+ [--durable true|false] - create durable topic
+ [--clientId ..] - connection client id; must be set for durable topics
[--parallelThreads N] - number of threads to run in parallel; default 1
[--bytesAsText true|false] - try to treat a BytesMessage as a text string
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/ebb3df76/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
----------------------------------------------------------------------
diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
index 65f437a..1f37586 100644
--- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
+++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt
@@ -14,5 +14,5 @@ Options :
[--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
[--message ..] - a text string to use as the message body
- [--url URL] - a url pointing to a document to use as the message body
+ [--payloadUrl URL] - a url pointing to a document to use as the message body
[--msgGroupID ..] - JMS message group identifier
\ No newline at end of file