You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/10/12 16:13:14 UTC

qpid-broker-j git commit: QPID-7969: [Stress Test Tools] Add ability into StressTestClient to pause indefinitely after creation of test session and before session close

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 39a6419b0 -> b53d39aab


QPID-7969: [Stress Test Tools] Add ability into StressTestClient to pause indefinitely after creation of test session and before session close


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b53d39aa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b53d39aa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b53d39aa

Branch: refs/heads/master
Commit: b53d39aab93fe1a71920c21cae60debc075f4a8b
Parents: 39a6419
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Oct 12 16:59:38 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Oct 12 16:59:38 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/tools/StressTestClient.java | 198 ++++++++++++-------
 1 file changed, 125 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b53d39aa/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
index 566ce42..fb0dd1e 100644
--- a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
+++ b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
@@ -70,7 +70,11 @@ public class StressTestClient
     private static final String CLOSE_SESSION_ARG = "closeSession";
     private static final String PAUSE_AFTER_CONNECTION_OPEN_ARG = "pauseAfterConnectionOpen";
     private static final String PAUSE_BEFORE_CONNECTION_CLOSE_ARG = "pauseBeforeConnectionClose";
-    private static final String ITERATIONS = "iterations";
+    private static final String CLOSE_PRODUCERS_ARG = "closeProducers";
+    private static final String PAUSE_AFTER_SESSION_CREATE_ARG = "pauseAfterSessionCreate";
+    private static final String PAUSE_BEFORE_SESSION_CLOSE_ARG = "pauseBeforeSessionClose";
+    private static final String SESSION_ITERATIONS_ARG = "sessionIterations";
+    private static final String MESSAGING_ITERATIONS_ARG = "messagingIterations";
     private static final String CONSUMER_MESSAGE_COUNT = "consumerMessageCount";
     private static final String CONSUMER_SELECTOR = "selector";
 
@@ -96,7 +100,11 @@ public class StressTestClient
     private static final String CLOSE_SESSION_DEFAULT = "false";
     private static final String PAUSE_AFTER_CONNECTION_OPEN_DEFAULT = "false";
     private static final String PAUSE_BEFORE_CONNECTION_CLOSE_DEFAULT = "false";
-    private static final String ITERATIONS_DEFAULT = "1";
+    private static final String CLOSE_PRODUCERS_DEFAULT = "false";
+    private static final String PAUSE_AFTER_SESSION_CREATE_DEFAULT = "false";
+    private static final String PAUSE_BEFORE_SESSION_CLOSE_DEFAULT = "false";
+    private static final String SESSION_ITERATIONS_DEFAULT = "1";
+    private static final String MESSAGING_ITERATIONS_DEFAULT = "1";
     private static final String CONSUMERS_SELECTOR_DEFAULT = "";
     private static final String CLASS = "StressTestClient";
     private static final String DISABLE_MESSAGE_ID_DEFAULT = Boolean.FALSE.toString();
@@ -130,7 +138,11 @@ public class StressTestClient
         options.put(CLOSE_SESSION_ARG, CLOSE_SESSION_DEFAULT);
         options.put(PAUSE_AFTER_CONNECTION_OPEN_ARG, PAUSE_AFTER_CONNECTION_OPEN_DEFAULT);
         options.put(PAUSE_BEFORE_CONNECTION_CLOSE_ARG, PAUSE_BEFORE_CONNECTION_CLOSE_DEFAULT);
-        options.put(ITERATIONS, ITERATIONS_DEFAULT);
+        options.put(CLOSE_PRODUCERS_ARG, CLOSE_PRODUCERS_DEFAULT);
+        options.put(PAUSE_AFTER_SESSION_CREATE_ARG, PAUSE_AFTER_SESSION_CREATE_DEFAULT);
+        options.put(PAUSE_BEFORE_SESSION_CLOSE_ARG, PAUSE_BEFORE_SESSION_CLOSE_DEFAULT);
+        options.put(SESSION_ITERATIONS_ARG, SESSION_ITERATIONS_DEFAULT);
+        options.put(MESSAGING_ITERATIONS_ARG, MESSAGING_ITERATIONS_DEFAULT);
         options.put(CONSUMER_SELECTOR, CONSUMERS_SELECTOR_DEFAULT);
         options.put(CONSUMER_MESSAGE_COUNT, "");
 
@@ -190,7 +202,11 @@ public class StressTestClient
         boolean closeSession = Boolean.valueOf(options.get(CLOSE_SESSION_ARG));
         boolean pauseAfterConnectionOpen = Boolean.valueOf(options.get(PAUSE_AFTER_CONNECTION_OPEN_ARG));
         boolean pauseBeforeConnectionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_CONNECTION_CLOSE_ARG));
-        int iterations = Integer.parseInt(options.get(ITERATIONS));
+        boolean closeProducers = Boolean.valueOf(options.get(CLOSE_PRODUCERS_ARG));
+        boolean pauseAfterSessionCreate = Boolean.valueOf(options.get(PAUSE_AFTER_SESSION_CREATE_ARG));
+        boolean pauseBeforeSessionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_SESSION_CLOSE_ARG));
+        int sessionIterations = Integer.parseInt(options.get(SESSION_ITERATIONS_ARG));
+        int messagingIterations = Integer.parseInt(options.get(MESSAGING_ITERATIONS_ARG));
         String consumerSelector =  options.get(CONSUMER_SELECTOR);
         int consumerMessageCount = !"".equals(options.get(CONSUMER_MESSAGE_COUNT)) ?
                 Integer.parseInt(options.get(CONSUMER_MESSAGE_COUNT)) : numMessage;
@@ -273,6 +289,15 @@ public class StressTestClient
                                 sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                             }
 
+                            if (pauseAfterSessionCreate)
+                            {
+                                System.out.println(String.format(
+                                        "Session %d is created on connection %d. Press any key to continue...",
+                                        se,
+                                        co));
+                                System.in.read();
+                            }
+
                             final Message message;
                             if (messageSize > 0)
                             {
@@ -291,109 +316,136 @@ public class StressTestClient
                                 sentBytes = null;
                             }
 
-                            MessageConsumer consumer = null;
-                            MessageConsumer[] consumers = new MessageConsumer[numConsumers];
-                            for(int cns = 1 ; cns <= numConsumers ; cns++)
+                            for (int sessionIteration = 1; sessionIteration <= sessionIterations; sessionIteration++)
                             {
-                                if( cns % reportingMod == 0)
+                                if (sessionIterations > 1 && sessionIteration % reportingMod == 0)
                                 {
-                                    System.out.println(CLASS + ": Creating Consumer " + cns);
+                                    System.out.println(CLASS + ": Session iteration " + sessionIteration);
                                 }
-                                consumer = sess.createConsumer(destination, consumerSelector);
-                                consumers[cns - 1] = consumer;
-                            }
 
-                            MessageProducer[] producers = new MessageProducer[numProducers];
-                            for(int pr = 1 ; pr <= numProducers ; pr++)
-                            {
-                                if( pr % reportingMod == 0)
+                                MessageConsumer consumer = null;
+                                MessageConsumer[] consumers = new MessageConsumer[numConsumers];
+                                for (int cns = 1; cns <= numConsumers; cns++)
                                 {
-                                    System.out.println(CLASS + ": Creating Producer " + pr);
+                                    if (cns % reportingMod == 0)
+                                    {
+                                        System.out.println(CLASS + ": Creating Consumer " + cns);
+                                    }
+                                    consumer = sess.createConsumer(destination, consumerSelector);
+                                    consumers[cns - 1] = consumer;
                                 }
-                                producers[pr-1] = sess.createProducer(destination);
 
-                                if (disableMessageID)
+                                MessageProducer[] producers = new MessageProducer[numProducers];
+                                for (int pr = 1; pr <= numProducers; pr++)
                                 {
-                                    producers[pr-1].setDisableMessageID(true);
-                                }
+                                    if (pr % reportingMod == 0)
+                                    {
+                                        System.out.println(CLASS + ": Creating Producer " + pr);
+                                    }
+                                    producers[pr - 1] = sess.createProducer(destination);
 
-                                if (disableMessageTimestamp)
-                                {
-                                    producers[pr-1].setDisableMessageTimestamp(true);
-                                }
-                            }
+                                    if (disableMessageID)
+                                    {
+                                        producers[pr - 1].setDisableMessageID(true);
+                                    }
 
-                            for (int iteration = 1; iteration <= iterations; iteration++)
-                            {
-                                if (iterations > 1 && iteration % reportingMod == 0)
-                                {
-                                    System.out.println(CLASS + ": Iteration " + iteration);
+                                    if (disableMessageTimestamp)
+                                    {
+                                        producers[pr - 1].setDisableMessageTimestamp(true);
+                                    }
                                 }
 
-                                for (int pr = 1; pr <= numProducers; pr++)
+                                for (int iteration = 1; iteration <= messagingIterations; iteration++)
                                 {
-                                    MessageProducer prod = producers[pr - 1];
-                                    for (int me = 1; me <= numMessage; me++)
+                                    if (messagingIterations > 1 && iteration % reportingMod == 0)
                                     {
-                                        int messageNumber = (iteration - 1) * numProducers * numMessage
-                                                            + (pr - 1) * numMessage + (me - 1);
-                                        if (messageNumber % reportingMod == 0)
-                                        {
-                                            System.out.println(CLASS + ": Sending Message " + messageNumber);
-                                        }
-                                        message.setIntProperty("index", me - 1);
-                                        prod.send(message, deliveryMode,
-                                                  Message.DEFAULT_PRIORITY,
-                                                  Message.DEFAULT_TIME_TO_LIVE);
-                                        if (sess.getTransacted() && me % txBatch == 0)
-                                        {
-                                            sess.commit();
-                                        }
+                                        System.out.println(CLASS + ": Iteration " + iteration);
                                     }
-                                }
 
-                                if(numConsumers == 1 && consumeImmediately)
-                                {
-                                    for(int cs = 1; cs <= consumerMessageCount; cs++)
+                                    for (int pr = 1; pr <= numProducers; pr++)
                                     {
-                                        if(cs % reportingMod == 0)
+                                        MessageProducer prod = producers[pr - 1];
+                                        for (int me = 1; me <= numMessage; me++)
                                         {
-                                            System.out.println(CLASS + ": Consuming Message " + cs);
+                                            int messageNumber = (iteration - 1) * numProducers * numMessage
+                                                                + (pr - 1) * numMessage + (me - 1);
+                                            if (messageNumber % reportingMod == 0)
+                                            {
+                                                System.out.println(CLASS + ": Sending Message " + messageNumber);
+                                            }
+                                            message.setIntProperty("index", me - 1);
+                                            prod.send(message, deliveryMode,
+                                                      Message.DEFAULT_PRIORITY,
+                                                      Message.DEFAULT_TIME_TO_LIVE);
+                                            if (sess.getTransacted() && me % txBatch == 0)
+                                            {
+                                                sess.commit();
+                                            }
                                         }
-                                        Message msg = consumer.receive(recieveTimeout);
+                                    }
 
-                                        if(sess.getTransacted() && cs % txBatch == 0)
+                                    if (numConsumers == 1 && consumeImmediately)
+                                    {
+                                        for (int cs = 1; cs <= consumerMessageCount; cs++)
                                         {
-                                            sess.commit();
+                                            if (cs % reportingMod == 0)
+                                            {
+                                                System.out.println(CLASS + ": Consuming Message " + cs);
+                                            }
+                                            Message msg = consumer.receive(recieveTimeout);
+
+                                            if (sess.getTransacted() && cs % txBatch == 0)
+                                            {
+                                                sess.commit();
+                                            }
+
+                                            if (msg == null)
+                                            {
+                                                throw new RuntimeException(
+                                                        "Expected message not received in allowed time: "
+                                                        + recieveTimeout);
+                                            }
+
+                                            if (messageSize > 0)
+                                            {
+                                                validateReceivedMessageContent(sentBytes,
+                                                                               (BytesMessage) msg, random, messageSize);
+                                            }
                                         }
+                                    }
+                                }
 
-                                        if(msg == null)
-                                        {
-                                            throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout);
-                                        }
+                                if (closeProducers)
+                                {
+                                    for (MessageProducer messageProducer : producers)
+                                    {
+                                        messageProducer.close();
+                                    }
+                                }
 
-                                        if (messageSize > 0)
-                                        {
-                                            validateReceivedMessageContent(sentBytes,
-                                                                           (BytesMessage) msg, random, messageSize);
-                                        }
+                                if (closeConsumers)
+                                {
+                                    for (MessageConsumer messageConsumer : consumers)
+                                    {
+                                        messageConsumer.close();
                                     }
                                 }
+
                             }
 
-                            if (closeConsumers)
+                            if (pauseBeforeSessionClose)
                             {
-                                for (MessageConsumer messageConsumer: consumers)
-                                {
-                                    messageConsumer.close();
-                                }
+                                System.out.println(String.format(
+                                        "Session %d on connection %d is about to be closed. Press any key to continue...",
+                                        se,
+                                        co));
+                                System.in.read();
                             }
 
-                            if(closeSession)
+                            if (closeSession)
                             {
                                 sess.close();
                             }
-
                         }
                         catch (Exception exp)
                         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org