You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 06:22:50 UTC

[pulsar] branch branch-2.9 updated: [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 0c1aab843cf [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750)
0c1aab843cf is described below

commit 0c1aab843cf2daac68894c73f77b7b63e2382e82
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Tue Sep 27 10:46:52 2022 +0200

    [fix][cli] Quit PerformanceConsumer after receiving numMessages messages (#17750)
    
    (cherry picked from commit 59ce90ce6b438717e4c23d3d2960354c10c2cb72)
---
 .../pulsar/testclient/PerformanceConsumer.java     |  6 ++
 .../pulsar/tests/integration/cli/PerfToolTest.java | 91 ++++++++++++++++++++++
 .../integration/src/test/resources/pulsar-cli.xml  |  1 +
 3 files changed, 98 insertions(+)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 77bef8c52b3..826b8fac3b9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -395,6 +395,12 @@ public class PerformanceConsumer {
                 totalMessagesReceived.increment();
                 totalBytesReceived.add(msg.size());
 
+                if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= arguments.numMessages) {
+                    log.info("------------------- DONE -----------------------");
+                    PerfClientUtils.exit(0);
+                    thread.interrupt();
+                }
+
                 if (limiter != null) {
                     limiter.acquire();
                 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
new file mode 100644
index 00000000000..55af57d3b52
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import static org.testng.Assert.fail;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class PerfToolTest extends TopicMessagingBase {
+
+    private static final int MESSAGE_COUNT = 50;
+
+    @Test
+    private void testProduce() throws Exception {
+        String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testProduce", true);
+        // Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult produceResult = produceWithPerfTool(clientToolContainer, serviceUrl, topicName);
+        checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated throughput stats",
+                "PerformanceProducer - Aggregated latency stats");
+    }
+
+    @Test
+    private void testConsume() throws Exception {
+        String serviceUrl = "pulsar://" + pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testConsume", true);
+        // Using the ZK container as it is separate from brokers, so its environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult consumeResult = consumeWithPerfTool(clientToolContainer, serviceUrl, topicName);
+        checkOutputForLogs(consumeResult,"PerformanceConsumer - Aggregated throughput stats",
+                "PerformanceConsumer - Aggregated latency stats");
+    }
+
+    private ContainerExecResult produceWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
+        ContainerExecResult result = container.execCmd("bin/pulsar-perf", "produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+
+        return failOnError("Performance producer", result);
+    }
+
+    private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> container, String url, String topic) throws Exception {
+        CompletableFuture<ContainerExecResult> resultFuture =
+                container.execCmdAsync("bin/pulsar-perf", "consume", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+        produceWithPerfTool(container, url, topic);
+
+        ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+        return failOnError("Performance consumer", result);
+    }
+
+    private static ContainerExecResult failOnError(String processDesc, ContainerExecResult result) {
+        if (result.getExitCode() != 0) {
+            fail(processDesc + " failed. Command output:\n" + result.getStdout()
+                    + "\nError output:\n" + result.getStderr());
+        }
+        return result;
+    }
+
+    private static void checkOutputForLogs(ContainerExecResult result, String... logs) {
+        String output = result.getStdout();
+        for (String log : logs) {
+            Assert.assertTrue(output.contains(log),
+                    "command output did not contain log message '" + log + "'.\nFull stdout is:\n" + output);
+        }
+    }
+
+}
diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml
index 847f492d032..35e8e720b07 100644
--- a/tests/integration/src/test/resources/pulsar-cli.xml
+++ b/tests/integration/src/test/resources/pulsar-cli.xml
@@ -30,6 +30,7 @@
             <class name="org.apache.pulsar.tests.integration.cli.FunctionsCLITest"/>
             <class name="org.apache.pulsar.tests.integration.cli.PackagesCliTest"/>
             <class name="org.apache.pulsar.tests.integration.cli.PulsarVersionTest"/>
+            <class name="org.apache.pulsar.tests.integration.cli.PerfToolTest"/>
         </classes>
     </test>
 </suite>