You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/15 16:53:34 UTC
[kafka] branch trunk updated: MINOR: Update Trogdor
ConnectionStressWorker status at the end of execution (#6445)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f20f3c1 MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)
f20f3c1 is described below
commit f20f3c1a97b8a31a2f211cd66506fb823f420c55
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Mar 15 18:53:21 2019 +0200
MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
tests/spec/connection_stress_test.json | 29 +++++++++++++++++
.../trogdor/workload/ConnectionStressWorker.java | 38 +++++++++++++---------
2 files changed, 51 insertions(+), 16 deletions(-)
diff --git a/tests/spec/connection_stress_test.json b/tests/spec/connection_stress_test.json
new file mode 100644
index 0000000..7b66985
--- /dev/null
+++ b/tests/spec/connection_stress_test.json
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//
+// An example task specification for running a connection stress test in Trogdor.
+// See TROGDOR.md for details.
+//
+
+{
+ "class": "org.apache.kafka.trogdor.workload.ConnectionStressSpec",
+ "durationMs": 60000,
+ "clientNode": "node0",
+ "bootstrapServers": "localhost:9092",
+ "targetConnectionsPerSec": 100,
+ "numThreads": 10,
+ "action": "CONNECT"
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index d85effc..9a9439a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -99,12 +99,14 @@ public class ConnectionStressWorker implements TaskWorker {
log.info("{}: Activating ConnectionStressWorker with {}", id, spec);
this.doneFuture = doneFuture;
this.status = status;
- this.totalConnections = 0;
- this.totalFailedConnections = 0;
- this.startTimeMs = TIME.milliseconds();
+ synchronized (ConnectionStressWorker.this) {
+ this.totalConnections = 0;
+ this.totalFailedConnections = 0;
+ this.nextReportTime = 0;
+ this.startTimeMs = TIME.milliseconds();
+ }
this.throttle = new ConnectStressThrottle(WorkerUtils.
perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
- this.nextReportTime = 0;
this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
for (int i = 0; i < spec.numThreads(); i++) {
@@ -112,6 +114,17 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
+ /**
+ * Update the worker's status and next status report time.
+ */
+ private synchronized void updateStatus(long lastTimeMs) {
+ status.update(JsonUtil.JSON_SERDE.valueToTree(
+ new StatusData(totalConnections,
+ totalFailedConnections,
+ (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
+ nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
+ }
+
private static class ConnectStressThrottle extends Throttle {
ConnectStressThrottle(int maxPerPeriod) {
super(maxPerPeriod, THROTTLE_PERIOD_MS);
@@ -130,10 +143,7 @@ public class ConnectionStressWorker implements TaskWorker {
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
- while (true) {
- if (doneFuture.isDone()) {
- break;
- }
+ while (!doneFuture.isDone()) {
throttle.increment();
long lastTimeMs = throttle.lastTimeMs();
boolean success = false;
@@ -150,13 +160,8 @@ public class ConnectionStressWorker implements TaskWorker {
if (!success) {
totalFailedConnections++;
}
- if (lastTimeMs > nextReportTime) {
- status.update(JsonUtil.JSON_SERDE.valueToTree(
- new StatusData(totalConnections,
- totalFailedConnections,
- (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
- nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
- }
+ if (lastTimeMs > nextReportTime)
+ updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
@@ -165,7 +170,7 @@ public class ConnectionStressWorker implements TaskWorker {
}
private boolean attemptConnection(AdminClientConfig conf,
- ManualMetadataUpdater updater) throws Exception {
+ ManualMetadataUpdater updater) {
try {
List<Node> nodes = updater.fetchNodes();
Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
@@ -250,6 +255,7 @@ public class ConnectionStressWorker implements TaskWorker {
doneFuture.complete("");
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.DAYS);
+ updateStatus(throttle.lastTimeMs());
this.workerExecutor = null;
this.status = null;
}