You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/04/15 18:10:31 UTC

svn commit: r1092753 - in /activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool: AbstractJmsClient.java JmsConsumerClient.java JmsProducerClient.java properties/JmsClientProperties.java

Author: dejanb
Date: Fri Apr 15 16:10:30 2011
New Revision: 1092753

URL: http://svn.apache.org/viewvc?rev=1092753&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3283 - transaction support for perf plugin

Modified:
    activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
    activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
    activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
    activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java

Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java Fri Apr 15 16:10:30 2011
@@ -37,6 +37,8 @@ public abstract class AbstractJmsClient 
     protected int destCount = 1;
     protected int destIndex;
     protected String clientName = "";
+    
+    private int internalTxCounter = 0;
 
     public AbstractJmsClient(ConnectionFactory factory) {
         this.factory = factory;
@@ -159,4 +161,25 @@ public abstract class AbstractJmsClient 
         }
     }
 
+    /** 
+     * Helper method that checks if session is 
+     * transacted and whether to commit the tx based on commitAfterXMsgs 
+     * property. 
+     * 
+     * @return true if transaction was committed. 
+     * @throws JMSException in case the call to JMS Session.commit() fails.
+     */
+    public boolean commitTxIfNecessary() throws JMSException {
+    	
+    	internalTxCounter++;
+        if (getClient().isSessTransacted()) {
+        	if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
+        		LOG.debug("Committing transaction.");
+        		internalTxCounter = 0;
+        		getSession().commit();
+        		return true;
+        	}
+        }
+        return false;
+    }
 }

Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java Fri Apr 15 16:10:30 2011
@@ -82,10 +82,14 @@ public class JmsConsumerClient extends A
 
             LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
             long endTime = System.currentTimeMillis() + duration;
+
+            int counter = 0;
             while (System.currentTimeMillis() < endTime) {
                 getJmsConsumer().receive();
                 incThroughput();
+                counter++;
                 sleep();
+                commitTxIfNecessary();
             }
         } finally {
             if (client.isDurable() && client.isUnsubscribe()) {
@@ -112,6 +116,7 @@ public class JmsConsumerClient extends A
                 incThroughput();
                 recvCount++;
                 sleep();
+                commitTxIfNecessary();
             }
         } finally {
             if (client.isDurable() && client.isUnsubscribe()) {
@@ -132,6 +137,11 @@ public class JmsConsumerClient extends A
             public void onMessage(Message msg) {
                 incThroughput();
                 sleep();
+                try {
+                	commitTxIfNecessary();
+                } catch (JMSException ex) {
+                	LOG.error("Error committing transaction: " + ex.getMessage());
+                }
             }
         });
 
@@ -165,6 +175,12 @@ public class JmsConsumerClient extends A
                 recvCount.incrementAndGet();
                 synchronized (recvCount) {
                     recvCount.notify();
+                } 
+                
+                try {
+                	commitTxIfNecessary();
+                } catch (JMSException ex) {
+                	LOG.error("Error committing transaction: " + ex.getMessage());
                 }
             }
         });
@@ -244,6 +260,10 @@ public class JmsConsumerClient extends A
         client = (JmsConsumerProperties)clientProps;
     }
     
+    /**
+     * A way to throttle the consumer. Time to sleep is 
+     * configured via recvDelay property. 
+     */
     protected void sleep() {
         if (client.getRecvDelay() > 0) {
         	try {

Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java Fri Apr 15 16:10:30 2011
@@ -97,6 +97,7 @@ public class JmsProducerClient extends A
                             getJmsProducer().send(dest[j], getJmsTextMessage());
                             incThroughput();
                             sleep();
+                            commitTxIfNecessary();
                         }
                     }
                     // Send to only one actual destination
@@ -105,6 +106,7 @@ public class JmsProducerClient extends A
                         getJmsProducer().send(getJmsTextMessage());
                         incThroughput();
                         sleep();
+                        commitTxIfNecessary();
                     }
                 }
 
@@ -119,6 +121,7 @@ public class JmsProducerClient extends A
                             getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
                             incThroughput();
                             sleep();
+                            commitTxIfNecessary();
                         }
                     }
 
@@ -128,6 +131,7 @@ public class JmsProducerClient extends A
                         getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
                         incThroughput();
                         sleep();
+                        commitTxIfNecessary();
                     }
                 }
             }
@@ -168,6 +172,7 @@ public class JmsProducerClient extends A
                             getJmsProducer().send(dest[j], getJmsTextMessage());
                             incThroughput();
                             sleep();
+                            commitTxIfNecessary();
                         }
                     }
                     // Send to only one actual destination
@@ -176,6 +181,7 @@ public class JmsProducerClient extends A
                         getJmsProducer().send(getJmsTextMessage());
                         incThroughput();
                         sleep();
+                        commitTxIfNecessary();
                     }
                 }
 
@@ -191,6 +197,7 @@ public class JmsProducerClient extends A
                             getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
                             incThroughput();
                             sleep();
+                            commitTxIfNecessary();
                         }
                     }
 
@@ -201,6 +208,7 @@ public class JmsProducerClient extends A
                         getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
                         incThroughput();
                         sleep();
+                        commitTxIfNecessary();
                     }
                 }
             }

Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java Fri Apr 15 16:10:30 2011
@@ -27,6 +27,9 @@ public class JmsClientProperties extends
 
     protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
     protected boolean sessTransacted;
+    
+    // commit transaction after X msgs only. 
+    protected int commitAfterXMsgs = 1;
 
     protected String jmsProvider;
     protected String jmsVersion;
@@ -63,6 +66,14 @@ public class JmsClientProperties extends
     public void setSessTransacted(boolean sessTransacted) {
         this.sessTransacted = sessTransacted;
     }
+    
+    public void setCommitAfterXMsgs(int commitAfterXMsg) {
+    	this.commitAfterXMsgs = commitAfterXMsg;
+    }
+    
+    public int getCommitAfterXMsgs() {
+    	return this.commitAfterXMsgs;
+    }
 
     public String getJmsProvider() {
         return jmsProvider;