You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/15 16:53:34 UTC

[kafka] branch trunk updated: MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f20f3c1  MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)
f20f3c1 is described below

commit f20f3c1a97b8a31a2f211cd66506fb823f420c55
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Mar 15 18:53:21 2019 +0200

    MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 tests/spec/connection_stress_test.json             | 29 +++++++++++++++++
 .../trogdor/workload/ConnectionStressWorker.java   | 38 +++++++++++++---------
 2 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/tests/spec/connection_stress_test.json b/tests/spec/connection_stress_test.json
new file mode 100644
index 0000000..7b66985
--- /dev/null
+++ b/tests/spec/connection_stress_test.json
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a connection stress test in Trogdor.
+// See TROGDOR.md for details.
+//
+
+{
+  "class": "org.apache.kafka.trogdor.workload.ConnectionStressSpec",
+  "durationMs": 60000,
+  "clientNode": "node0",
+  "bootstrapServers": "localhost:9092",
+  "targetConnectionsPerSec": 100,
+  "numThreads": 10,
+  "action": "CONNECT"
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index d85effc..9a9439a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -99,12 +99,14 @@ public class ConnectionStressWorker implements TaskWorker {
         log.info("{}: Activating ConnectionStressWorker with {}", id, spec);
         this.doneFuture = doneFuture;
         this.status = status;
-        this.totalConnections = 0;
-        this.totalFailedConnections  = 0;
-        this.startTimeMs = TIME.milliseconds();
+        synchronized (ConnectionStressWorker.this) {
+            this.totalConnections = 0;
+            this.totalFailedConnections = 0;
+            this.nextReportTime = 0;
+            this.startTimeMs = TIME.milliseconds();
+        }
         this.throttle = new ConnectStressThrottle(WorkerUtils.
             perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
-        this.nextReportTime = 0;
         this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
             ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
         for (int i = 0; i < spec.numThreads(); i++) {
@@ -112,6 +114,17 @@ public class ConnectionStressWorker implements TaskWorker {
         }
     }
 
+    /**
+     * Update the worker's status and next status report time.
+     */
+    private synchronized void updateStatus(long lastTimeMs) {
+        status.update(JsonUtil.JSON_SERDE.valueToTree(
+                new StatusData(totalConnections,
+                        totalFailedConnections,
+                        (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
+        nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
+    }
+
     private static class ConnectStressThrottle extends Throttle {
         ConnectStressThrottle(int maxPerPeriod) {
             super(maxPerPeriod, THROTTLE_PERIOD_MS);
@@ -130,10 +143,7 @@ public class ConnectionStressWorker implements TaskWorker {
                         conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
                         conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
                 ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
-                while (true) {
-                    if (doneFuture.isDone()) {
-                        break;
-                    }
+                while (!doneFuture.isDone()) {
                     throttle.increment();
                     long lastTimeMs = throttle.lastTimeMs();
                     boolean success = false;
@@ -150,13 +160,8 @@ public class ConnectionStressWorker implements TaskWorker {
                         if (!success) {
                             totalFailedConnections++;
                         }
-                        if (lastTimeMs > nextReportTime) {
-                            status.update(JsonUtil.JSON_SERDE.valueToTree(
-                                new StatusData(totalConnections,
-                                    totalFailedConnections,
-                                    (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
-                            nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
-                        }
+                        if (lastTimeMs > nextReportTime)
+                            updateStatus(lastTimeMs);
                     }
                 }
             } catch (Exception e) {
@@ -165,7 +170,7 @@ public class ConnectionStressWorker implements TaskWorker {
         }
 
         private boolean attemptConnection(AdminClientConfig conf,
-                                          ManualMetadataUpdater updater) throws Exception {
+                                          ManualMetadataUpdater updater) {
             try {
                 List<Node> nodes = updater.fetchNodes();
                 Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
@@ -250,6 +255,7 @@ public class ConnectionStressWorker implements TaskWorker {
         doneFuture.complete("");
         workerExecutor.shutdownNow();
         workerExecutor.awaitTermination(1, TimeUnit.DAYS);
+        updateStatus(throttle.lastTimeMs());
         this.workerExecutor = null;
         this.status = null;
     }