You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/04/04 21:17:34 UTC
[kafka] branch trunk updated: MINOR: fix throttling and status in
ConnectionStressWorker
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a674ded MINOR: fix throttling and status in ConnectionStressWorker
a674ded is described below
commit a674ded0b34b79e04b4242a3db19c90ae87b6f87
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Thu Apr 4 14:16:56 2019 -0700
MINOR: fix throttling and status in ConnectionStressWorker
Each separate thread should have its own throttle, so that it can sleep
for an appropriate amount of time when needed.
ConnectionStressWorker should avoid recalculating the status after
shutting down the runnables. Otherwise, if one runnable is slow to
stop, it will skew the average down in a way that doesn't reflect
reality. This change moves the status calculation into a separate
periodic runnable that gets shut down cleanly before the other ones.
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Gwen Shapira, Stanislav Kozlovski
Closes #6533 from cmccabe/fix_connection_stress_worker
---
.../trogdor/workload/ConnectionStressWorker.java | 66 ++++++++++++++--------
1 file changed, 43 insertions(+), 23 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index d42367e..e9bbf07 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
@@ -52,6 +53,8 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +65,7 @@ public class ConnectionStressWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
- private static final int REPORT_INTERVAL_MS = 20000;
+ private static final int REPORT_INTERVAL_MS = 5000;
private final String id;
@@ -80,12 +83,12 @@ public class ConnectionStressWorker implements TaskWorker {
private long startTimeMs;
- private Throttle throttle;
-
- private long nextReportTime;
+ private Future<?> statusUpdaterFuture;
private ExecutorService workerExecutor;
+ private ScheduledExecutorService statusUpdaterExecutor;
+
public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
this.id = id;
this.spec = spec;
@@ -103,11 +106,12 @@ public class ConnectionStressWorker implements TaskWorker {
synchronized (ConnectionStressWorker.this) {
this.totalConnections = 0;
this.totalFailedConnections = 0;
- this.nextReportTime = 0;
this.startTimeMs = TIME.milliseconds();
}
- this.throttle = new ConnectStressThrottle(WorkerUtils.
- perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
+ this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
+ ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
+ this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(
+ new StatusUpdater(), 0, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
for (int i = 0; i < spec.numThreads(); i++) {
@@ -115,17 +119,6 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
- /**
- * Update the worker's status and next status report time.
- */
- private synchronized void updateStatus(long lastTimeMs) {
- status.update(JsonUtil.JSON_SERDE.valueToTree(
- new StatusData(totalConnections,
- totalFailedConnections,
- (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
- nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
- }
-
private static class ConnectStressThrottle extends Throttle {
ConnectStressThrottle(int maxPerPeriod) {
super(maxPerPeriod, THROTTLE_PERIOD_MS);
@@ -233,27 +226,45 @@ public class ConnectionStressWorker implements TaskWorker {
@Override
public void run() {
Stressor stressor = Stressor.fromSpec(spec);
+ int rate = WorkerUtils.perSecToPerPeriod(
+ ((float) spec.targetConnectionsPerSec()) / spec.numThreads(),
+ THROTTLE_PERIOD_MS);
+ Throttle throttle = new ConnectStressThrottle(rate);
try {
while (!doneFuture.isDone()) {
throttle.increment();
- long lastTimeMs = throttle.lastTimeMs();
boolean success = stressor.tryConnect();
synchronized (ConnectionStressWorker.this) {
totalConnections++;
if (!success) {
totalFailedConnections++;
}
- if (lastTimeMs > nextReportTime)
- updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
- WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
+ WorkerUtils.abort(log, "ConnectLoop", e, doneFuture);
} finally {
Utils.closeQuietly(stressor, "stressor");
}
}
+ }
+ private class StatusUpdater implements Runnable {
+ @Override
+ public void run() {
+ try {
+ long lastTimeMs = Time.SYSTEM.milliseconds();
+ JsonNode node = null;
+ synchronized (ConnectionStressWorker.this) {
+ node = JsonUtil.JSON_SERDE.valueToTree(
+ new StatusData(totalConnections, totalFailedConnections,
+ (totalConnections * 1000.0) / (lastTimeMs - startTimeMs)));
+ }
+ status.update(node);
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+ }
+ }
}
public static class StatusData {
@@ -292,10 +303,19 @@ public class ConnectionStressWorker implements TaskWorker {
throw new IllegalStateException("ConnectionStressWorker is not running.");
}
log.info("{}: Deactivating ConnectionStressWorker.", id);
+
+ // Shut down the periodic status updater and perform a final update on the
+ // statistics. We want to do this first, before deactivating any threads.
+ // Otherwise, if some threads take a while to terminate, this could lead
+ // to a misleading rate getting reported.
+ this.statusUpdaterFuture.cancel(false);
+ this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS);
+ this.statusUpdaterExecutor = null;
+ new StatusUpdater().run();
+
doneFuture.complete("");
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.DAYS);
- updateStatus(throttle.lastTimeMs());
this.workerExecutor = null;
this.status = null;
}