You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/02/04 14:10:28 UTC

svn commit: r1067175 - in /activemq/trunk/assembly/src/release/example: build.xml src/ConsumerTool.java

Author: dejanb
Date: Fri Feb  4 13:10:27 2011
New Revision: 1067175

URL: http://svn.apache.org/viewvc?rev=1067175&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3174 - add batch parameter for transactions and client ack to consumer tool

Modified:
    activemq/trunk/assembly/src/release/example/build.xml
    activemq/trunk/assembly/src/release/example/src/ConsumerTool.java

Modified: activemq/trunk/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/build.xml?rev=1067175&r1=1067174&r2=1067175&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/build.xml (original)
+++ activemq/trunk/assembly/src/release/example/build.xml Fri Feb  4 13:10:27 2011
@@ -94,6 +94,7 @@
                  receive-time-out - An integer to specify the time to wait for
                                     message consumption
                   parallelThreads - The number of parallel threads
+                            batch - Batch size for transactions and client acknowledgment (default 10)
 				
             --------------------------------------------------------				
             ant producer <options> - Creates a producer publishing a number of messages
@@ -218,7 +219,8 @@
 			<arg value="--sleep-time=${sleepTime}" />
 			<arg value="--verbose=${verbose}"/>
 			<arg value="--ack-mode=${ack-mode}"/>		
-			<arg value="--receive-time-out=${receive-time-out}"/>					
+			<arg value="--receive-time-out=${receive-time-out}"/>
+            <arg value="--batch=${batch}"/>
 		</java>
 	</target>
 

Modified: activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?rev=1067175&r1=1067174&r2=1067175&view=diff
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Fri Feb  4 13:10:27 2011
@@ -65,6 +65,8 @@ public class ConsumerTool extends Thread
     private String consumerName = "James";
     private long sleepTime;
     private long receiveTimeOut;
+	private long batch = 10; // Default batch size for CLIENT_ACKNOWLEDGEMENT or SESSION_TRANSACTED
+	private long messagesReceived = 0;
 
     public static void main(String[] args) {
         ArrayList<ConsumerTool> threads = new ArrayList();
@@ -161,6 +163,9 @@ public class ConsumerTool extends Thread
     }
 
     public void onMessage(Message message) {
+
+		messagesReceived++;
+
         try {
 
             if (message instanceof TextMessage) {
@@ -185,9 +190,15 @@ public class ConsumerTool extends Thread
             }
 
             if (transacted) {
-                session.commit();
+				if ((messagesReceived % batch) == 0) {
+					System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
+					session.commit();
+				}
             } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
-                message.acknowledge();
+				if ((messagesReceived % batch) == 0) {
+					System.out.println("Acknowledging last " + batch + " messages; messages so far = " + messagesReceived);
+					message.acknowledge();
+				}
             }
 
         } catch (JMSException e) {
@@ -335,4 +346,8 @@ public class ConsumerTool extends Thread
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
+
+    public void setBatch(long batch) {
+        this.batch = batch;
+    }
 }