You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/02/04 14:10:28 UTC
svn commit: r1067175 - in /activemq/trunk/assembly/src/release/example:
build.xml src/ConsumerTool.java
Author: dejanb
Date: Fri Feb 4 13:10:27 2011
New Revision: 1067175
URL: http://svn.apache.org/viewvc?rev=1067175&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3174 - add batch parameter for transactions and client ack to consumer tool
Modified:
activemq/trunk/assembly/src/release/example/build.xml
activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=1067175&r1=1067174&r2=1067175&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Fri Feb 4 13:10:27 2011
@@ -94,6 +94,7 @@
receive-time-out - An integer to specify the time to wait for
message consumption
parallelThreads - The number of parallel threads
+ batch - Batch size for transactions and client acknowledgment (default 10)
--------------------------------------------------------
ant producer <options> - Creates a producer publishing a number of messages
@@ -218,7 +219,8 @@
<arg value="--sleep-time=${sleepTime}" />
<arg value="--verbose=${verbose}"/>
<arg value="--ack-mode=${ack-mode}"/>
- <arg value="--receive-time-out=${receive-time-out}"/>
+ <arg value="--receive-time-out=${receive-time-out}"/>
+ <arg value="--batch=${batch}"/>
</java>
</target>
Modified: activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?rev=1067175&r1=1067174&r2=1067175&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Fri Feb 4 13:10:27 2011
@@ -65,6 +65,8 @@ public class ConsumerTool extends Thread
private String consumerName = "James";
private long sleepTime;
private long receiveTimeOut;
+ private long batch = 10; // Default batch size for CLIENT_ACKNOWLEDGEMENT or SESSION_TRANSACTED
+ private long messagesReceived = 0;
public static void main(String[] args) {
ArrayList<ConsumerTool> threads = new ArrayList();
@@ -161,6 +163,9 @@ public class ConsumerTool extends Thread
}
public void onMessage(Message message) {
+
+ messagesReceived++;
+
try {
if (message instanceof TextMessage) {
@@ -185,9 +190,15 @@ public class ConsumerTool extends Thread
}
if (transacted) {
- session.commit();
+ if ((messagesReceived % batch) == 0) {
+ System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
+ session.commit();
+ }
} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
- message.acknowledge();
+ if ((messagesReceived % batch) == 0) {
+ System.out.println("Acknowledging last " + batch + " messages; messages so far = " + messagesReceived);
+ message.acknowledge();
+ }
}
} catch (JMSException e) {
@@ -335,4 +346,8 @@ public class ConsumerTool extends Thread
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
+
+ public void setBatch(long batch) {
+ this.batch = batch;
+ }
}