You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/12 16:13:14 UTC
qpid-broker-j git commit: QPID-7969: [Stress Test Tools] Add ability
into StressTestClient to pause indefinitely after creation of test session
and before session close
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 39a6419b0 -> b53d39aab
QPID-7969: [Stress Test Tools] Add ability into StressTestClient to pause indefinitely after creation of test session and before session close
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b53d39aa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b53d39aa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b53d39aa
Branch: refs/heads/master
Commit: b53d39aab93fe1a71920c21cae60debc075f4a8b
Parents: 39a6419
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Oct 12 16:59:38 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 12 16:59:38 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/tools/StressTestClient.java | 198 ++++++++++++-------
1 file changed, 125 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b53d39aa/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
index 566ce42..fb0dd1e 100644
--- a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
+++ b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
@@ -70,7 +70,11 @@ public class StressTestClient
private static final String CLOSE_SESSION_ARG = "closeSession";
private static final String PAUSE_AFTER_CONNECTION_OPEN_ARG = "pauseAfterConnectionOpen";
private static final String PAUSE_BEFORE_CONNECTION_CLOSE_ARG = "pauseBeforeConnectionClose";
- private static final String ITERATIONS = "iterations";
+ private static final String CLOSE_PRODUCERS_ARG = "closeProducers";
+ private static final String PAUSE_AFTER_SESSION_CREATE_ARG = "pauseAfterSessionCreate";
+ private static final String PAUSE_BEFORE_SESSION_CLOSE_ARG = "pauseBeforeSessionClose";
+ private static final String SESSION_ITERATIONS_ARG = "sessionIterations";
+ private static final String MESSAGING_ITERATIONS_ARG = "messagingIterations";
private static final String CONSUMER_MESSAGE_COUNT = "consumerMessageCount";
private static final String CONSUMER_SELECTOR = "selector";
@@ -96,7 +100,11 @@ public class StressTestClient
private static final String CLOSE_SESSION_DEFAULT = "false";
private static final String PAUSE_AFTER_CONNECTION_OPEN_DEFAULT = "false";
private static final String PAUSE_BEFORE_CONNECTION_CLOSE_DEFAULT = "false";
- private static final String ITERATIONS_DEFAULT = "1";
+ private static final String CLOSE_PRODUCERS_DEFAULT = "false";
+ private static final String PAUSE_AFTER_SESSION_CREATE_DEFAULT = "false";
+ private static final String PAUSE_BEFORE_SESSION_CLOSE_DEFAULT = "false";
+ private static final String SESSION_ITERATIONS_DEFAULT = "1";
+ private static final String MESSAGING_ITERATIONS_DEFAULT = "1";
private static final String CONSUMERS_SELECTOR_DEFAULT = "";
private static final String CLASS = "StressTestClient";
private static final String DISABLE_MESSAGE_ID_DEFAULT = Boolean.FALSE.toString();
@@ -130,7 +138,11 @@ public class StressTestClient
options.put(CLOSE_SESSION_ARG, CLOSE_SESSION_DEFAULT);
options.put(PAUSE_AFTER_CONNECTION_OPEN_ARG, PAUSE_AFTER_CONNECTION_OPEN_DEFAULT);
options.put(PAUSE_BEFORE_CONNECTION_CLOSE_ARG, PAUSE_BEFORE_CONNECTION_CLOSE_DEFAULT);
- options.put(ITERATIONS, ITERATIONS_DEFAULT);
+ options.put(CLOSE_PRODUCERS_ARG, CLOSE_PRODUCERS_DEFAULT);
+ options.put(PAUSE_AFTER_SESSION_CREATE_ARG, PAUSE_AFTER_SESSION_CREATE_DEFAULT);
+ options.put(PAUSE_BEFORE_SESSION_CLOSE_ARG, PAUSE_BEFORE_SESSION_CLOSE_DEFAULT);
+ options.put(SESSION_ITERATIONS_ARG, SESSION_ITERATIONS_DEFAULT);
+ options.put(MESSAGING_ITERATIONS_ARG, MESSAGING_ITERATIONS_DEFAULT);
options.put(CONSUMER_SELECTOR, CONSUMERS_SELECTOR_DEFAULT);
options.put(CONSUMER_MESSAGE_COUNT, "");
@@ -190,7 +202,11 @@ public class StressTestClient
boolean closeSession = Boolean.valueOf(options.get(CLOSE_SESSION_ARG));
boolean pauseAfterConnectionOpen = Boolean.valueOf(options.get(PAUSE_AFTER_CONNECTION_OPEN_ARG));
boolean pauseBeforeConnectionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_CONNECTION_CLOSE_ARG));
- int iterations = Integer.parseInt(options.get(ITERATIONS));
+ boolean closeProducers = Boolean.valueOf(options.get(CLOSE_PRODUCERS_ARG));
+ boolean pauseAfterSessionCreate = Boolean.valueOf(options.get(PAUSE_AFTER_SESSION_CREATE_ARG));
+ boolean pauseBeforeSessionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_SESSION_CLOSE_ARG));
+ int sessionIterations = Integer.parseInt(options.get(SESSION_ITERATIONS_ARG));
+ int messagingIterations = Integer.parseInt(options.get(MESSAGING_ITERATIONS_ARG));
String consumerSelector = options.get(CONSUMER_SELECTOR);
int consumerMessageCount = !"".equals(options.get(CONSUMER_MESSAGE_COUNT)) ?
Integer.parseInt(options.get(CONSUMER_MESSAGE_COUNT)) : numMessage;
@@ -273,6 +289,15 @@ public class StressTestClient
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ if (pauseAfterSessionCreate)
+ {
+ System.out.println(String.format(
+ "Session %d is created on connection %d. Press any key to continue...",
+ se,
+ co));
+ System.in.read();
+ }
+
final Message message;
if (messageSize > 0)
{
@@ -291,109 +316,136 @@ public class StressTestClient
sentBytes = null;
}
- MessageConsumer consumer = null;
- MessageConsumer[] consumers = new MessageConsumer[numConsumers];
- for(int cns = 1 ; cns <= numConsumers ; cns++)
+ for (int sessionIteration = 1; sessionIteration <= sessionIterations; sessionIteration++)
{
- if( cns % reportingMod == 0)
+ if (sessionIterations > 1 && sessionIteration % reportingMod == 0)
{
- System.out.println(CLASS + ": Creating Consumer " + cns);
+ System.out.println(CLASS + ": Session iteration " + sessionIteration);
}
- consumer = sess.createConsumer(destination, consumerSelector);
- consumers[cns - 1] = consumer;
- }
- MessageProducer[] producers = new MessageProducer[numProducers];
- for(int pr = 1 ; pr <= numProducers ; pr++)
- {
- if( pr % reportingMod == 0)
+ MessageConsumer consumer = null;
+ MessageConsumer[] consumers = new MessageConsumer[numConsumers];
+ for (int cns = 1; cns <= numConsumers; cns++)
{
- System.out.println(CLASS + ": Creating Producer " + pr);
+ if (cns % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating Consumer " + cns);
+ }
+ consumer = sess.createConsumer(destination, consumerSelector);
+ consumers[cns - 1] = consumer;
}
- producers[pr-1] = sess.createProducer(destination);
- if (disableMessageID)
+ MessageProducer[] producers = new MessageProducer[numProducers];
+ for (int pr = 1; pr <= numProducers; pr++)
{
- producers[pr-1].setDisableMessageID(true);
- }
+ if (pr % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Creating Producer " + pr);
+ }
+ producers[pr - 1] = sess.createProducer(destination);
- if (disableMessageTimestamp)
- {
- producers[pr-1].setDisableMessageTimestamp(true);
- }
- }
+ if (disableMessageID)
+ {
+ producers[pr - 1].setDisableMessageID(true);
+ }
- for (int iteration = 1; iteration <= iterations; iteration++)
- {
- if (iterations > 1 && iteration % reportingMod == 0)
- {
- System.out.println(CLASS + ": Iteration " + iteration);
+ if (disableMessageTimestamp)
+ {
+ producers[pr - 1].setDisableMessageTimestamp(true);
+ }
}
- for (int pr = 1; pr <= numProducers; pr++)
+ for (int iteration = 1; iteration <= messagingIterations; iteration++)
{
- MessageProducer prod = producers[pr - 1];
- for (int me = 1; me <= numMessage; me++)
+ if (messagingIterations > 1 && iteration % reportingMod == 0)
{
- int messageNumber = (iteration - 1) * numProducers * numMessage
- + (pr - 1) * numMessage + (me - 1);
- if (messageNumber % reportingMod == 0)
- {
- System.out.println(CLASS + ": Sending Message " + messageNumber);
- }
- message.setIntProperty("index", me - 1);
- prod.send(message, deliveryMode,
- Message.DEFAULT_PRIORITY,
- Message.DEFAULT_TIME_TO_LIVE);
- if (sess.getTransacted() && me % txBatch == 0)
- {
- sess.commit();
- }
+ System.out.println(CLASS + ": Iteration " + iteration);
}
- }
- if(numConsumers == 1 && consumeImmediately)
- {
- for(int cs = 1; cs <= consumerMessageCount; cs++)
+ for (int pr = 1; pr <= numProducers; pr++)
{
- if(cs % reportingMod == 0)
+ MessageProducer prod = producers[pr - 1];
+ for (int me = 1; me <= numMessage; me++)
{
- System.out.println(CLASS + ": Consuming Message " + cs);
+ int messageNumber = (iteration - 1) * numProducers * numMessage
+ + (pr - 1) * numMessage + (me - 1);
+ if (messageNumber % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Sending Message " + messageNumber);
+ }
+ message.setIntProperty("index", me - 1);
+ prod.send(message, deliveryMode,
+ Message.DEFAULT_PRIORITY,
+ Message.DEFAULT_TIME_TO_LIVE);
+ if (sess.getTransacted() && me % txBatch == 0)
+ {
+ sess.commit();
+ }
}
- Message msg = consumer.receive(recieveTimeout);
+ }
- if(sess.getTransacted() && cs % txBatch == 0)
+ if (numConsumers == 1 && consumeImmediately)
+ {
+ for (int cs = 1; cs <= consumerMessageCount; cs++)
{
- sess.commit();
+ if (cs % reportingMod == 0)
+ {
+ System.out.println(CLASS + ": Consuming Message " + cs);
+ }
+ Message msg = consumer.receive(recieveTimeout);
+
+ if (sess.getTransacted() && cs % txBatch == 0)
+ {
+ sess.commit();
+ }
+
+ if (msg == null)
+ {
+ throw new RuntimeException(
+ "Expected message not received in allowed time: "
+ + recieveTimeout);
+ }
+
+ if (messageSize > 0)
+ {
+ validateReceivedMessageContent(sentBytes,
+ (BytesMessage) msg, random, messageSize);
+ }
}
+ }
+ }
- if(msg == null)
- {
- throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout);
- }
+ if (closeProducers)
+ {
+ for (MessageProducer messageProducer : producers)
+ {
+ messageProducer.close();
+ }
+ }
- if (messageSize > 0)
- {
- validateReceivedMessageContent(sentBytes,
- (BytesMessage) msg, random, messageSize);
- }
+ if (closeConsumers)
+ {
+ for (MessageConsumer messageConsumer : consumers)
+ {
+ messageConsumer.close();
}
}
+
}
- if (closeConsumers)
+ if (pauseBeforeSessionClose)
{
- for (MessageConsumer messageConsumer: consumers)
- {
- messageConsumer.close();
- }
+ System.out.println(String.format(
+ "Session %d on connection %d is about to be closed. Press any key to continue...",
+ se,
+ co));
+ System.in.read();
}
- if(closeSession)
+ if (closeSession)
{
sess.close();
}
-
}
catch (Exception exp)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org