You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/04/15 18:10:31 UTC
svn commit: r1092753 - in
/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool:
AbstractJmsClient.java JmsConsumerClient.java JmsProducerClient.java
properties/JmsClientProperties.java
Author: dejanb
Date: Fri Apr 15 16:10:30 2011
New Revision: 1092753
URL: http://svn.apache.org/viewvc?rev=1092753&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3283 - transaction support for perf plugin
Modified:
activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java
Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java Fri Apr 15 16:10:30 2011
@@ -37,6 +37,8 @@ public abstract class AbstractJmsClient
protected int destCount = 1;
protected int destIndex;
protected String clientName = "";
+
+ private int internalTxCounter = 0;
public AbstractJmsClient(ConnectionFactory factory) {
this.factory = factory;
@@ -159,4 +161,25 @@ public abstract class AbstractJmsClient
}
}
+ /**
+ * Helper method that checks if session is
+ * transacted and whether to commit the tx based on commitAfterXMsgs
+ * property.
+ *
+ * @return true if transaction was committed.
+ * @throws JMSException in case the call to JMS Session.commit() fails.
+ */
+ public boolean commitTxIfNecessary() throws JMSException {
+
+ internalTxCounter++;
+ if (getClient().isSessTransacted()) {
+ if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
+ LOG.debug("Committing transaction.");
+ internalTxCounter = 0;
+ getSession().commit();
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java Fri Apr 15 16:10:30 2011
@@ -82,10 +82,14 @@ public class JmsConsumerClient extends A
LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
long endTime = System.currentTimeMillis() + duration;
+
+ int counter = 0;
while (System.currentTimeMillis() < endTime) {
getJmsConsumer().receive();
incThroughput();
+ counter++;
sleep();
+ commitTxIfNecessary();
}
} finally {
if (client.isDurable() && client.isUnsubscribe()) {
@@ -112,6 +116,7 @@ public class JmsConsumerClient extends A
incThroughput();
recvCount++;
sleep();
+ commitTxIfNecessary();
}
} finally {
if (client.isDurable() && client.isUnsubscribe()) {
@@ -132,6 +137,11 @@ public class JmsConsumerClient extends A
public void onMessage(Message msg) {
incThroughput();
sleep();
+ try {
+ commitTxIfNecessary();
+ } catch (JMSException ex) {
+ LOG.error("Error committing transaction: " + ex.getMessage());
+ }
}
});
@@ -165,6 +175,12 @@ public class JmsConsumerClient extends A
recvCount.incrementAndGet();
synchronized (recvCount) {
recvCount.notify();
+ }
+
+ try {
+ commitTxIfNecessary();
+ } catch (JMSException ex) {
+ LOG.error("Error committing transaction: " + ex.getMessage());
}
}
});
@@ -244,6 +260,10 @@ public class JmsConsumerClient extends A
client = (JmsConsumerProperties)clientProps;
}
+ /**
+ * A way to throttle the consumer. Time to sleep is
+ * configured via recvDelay property.
+ */
protected void sleep() {
if (client.getRecvDelay() > 0) {
try {
Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java Fri Apr 15 16:10:30 2011
@@ -97,6 +97,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
// Send to only one actual destination
@@ -105,6 +106,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(getJmsTextMessage());
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
@@ -119,6 +121,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
@@ -128,6 +131,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
}
@@ -168,6 +172,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(dest[j], getJmsTextMessage());
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
// Send to only one actual destination
@@ -176,6 +181,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(getJmsTextMessage());
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
@@ -191,6 +197,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
@@ -201,6 +208,7 @@ public class JmsProducerClient extends A
getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
incThroughput();
sleep();
+ commitTxIfNecessary();
}
}
}
Modified: activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java?rev=1092753&r1=1092752&r2=1092753&view=diff
==============================================================================
--- activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java (original)
+++ activemq/trunk/activemq-tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/properties/JmsClientProperties.java Fri Apr 15 16:10:30 2011
@@ -27,6 +27,9 @@ public class JmsClientProperties extends
protected String sessAckMode = SESSION_AUTO_ACKNOWLEDGE;
protected boolean sessTransacted;
+
+ // commit transaction after X msgs only.
+ protected int commitAfterXMsgs = 1;
protected String jmsProvider;
protected String jmsVersion;
@@ -63,6 +66,14 @@ public class JmsClientProperties extends
public void setSessTransacted(boolean sessTransacted) {
this.sessTransacted = sessTransacted;
}
+
+ public void setCommitAfterXMsgs(int commitAfterXMsg) {
+ this.commitAfterXMsgs = commitAfterXMsg;
+ }
+
+ public int getCommitAfterXMsgs() {
+ return this.commitAfterXMsgs;
+ }
public String getJmsProvider() {
return jmsProvider;