You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/04/04 21:17:34 UTC

[kafka] branch trunk updated: MINOR: fix throttling and status in ConnectionStressWorker

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a674ded  MINOR: fix throttling and status in ConnectionStressWorker
a674ded is described below

commit a674ded0b34b79e04b4242a3db19c90ae87b6f87
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Thu Apr 4 14:16:56 2019 -0700

    MINOR: fix throttling and status in ConnectionStressWorker
    
    Each separate thread should have its own throttle, so that it can sleep
    for an appropriate amount of time when needed.
    
    ConnectionStressWorker should avoid recalculating the status after
    shutting down the runnables.  Otherwise, if one runnable is slow to
    stop, it will skew the average down in a way that doesn't reflect
    reality.  This change moves the status calculation into a separate
    periodic runnable that gets shut down cleanly before the other ones.
    
    Author: Colin P. Mccabe <cm...@confluent.io>
    
    Reviewers: Gwen Shapira, Stanislav Kozlovski
    
    Closes #6533 from cmccabe/fix_connection_stress_worker
---
 .../trogdor/workload/ConnectionStressWorker.java   | 66 ++++++++++++++--------
 1 file changed, 43 insertions(+), 23 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index d42367e..e9bbf07 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
@@ -52,6 +53,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +65,7 @@ public class ConnectionStressWorker implements TaskWorker {
 
     private static final int THROTTLE_PERIOD_MS = 100;
 
-    private static final int REPORT_INTERVAL_MS = 20000;
+    private static final int REPORT_INTERVAL_MS = 5000;
 
     private final String id;
 
@@ -80,12 +83,12 @@ public class ConnectionStressWorker implements TaskWorker {
 
     private long startTimeMs;
 
-    private Throttle throttle;
-
-    private long nextReportTime;
+    private Future<?> statusUpdaterFuture;
 
     private ExecutorService workerExecutor;
 
+    private ScheduledExecutorService statusUpdaterExecutor;
+
     public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -103,11 +106,12 @@ public class ConnectionStressWorker implements TaskWorker {
         synchronized (ConnectionStressWorker.this) {
             this.totalConnections = 0;
             this.totalFailedConnections = 0;
-            this.nextReportTime = 0;
             this.startTimeMs = TIME.milliseconds();
         }
-        this.throttle = new ConnectStressThrottle(WorkerUtils.
-            perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
+        this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
+            ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
+        this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(
+            new StatusUpdater(), 0, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS);
         this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
             ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
         for (int i = 0; i < spec.numThreads(); i++) {
@@ -115,17 +119,6 @@ public class ConnectionStressWorker implements TaskWorker {
         }
     }
 
-    /**
-     * Update the worker's status and next status report time.
-     */
-    private synchronized void updateStatus(long lastTimeMs) {
-        status.update(JsonUtil.JSON_SERDE.valueToTree(
-                new StatusData(totalConnections,
-                        totalFailedConnections,
-                        (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
-        nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
-    }
-
     private static class ConnectStressThrottle extends Throttle {
         ConnectStressThrottle(int maxPerPeriod) {
             super(maxPerPeriod, THROTTLE_PERIOD_MS);
@@ -233,27 +226,45 @@ public class ConnectionStressWorker implements TaskWorker {
         @Override
         public void run() {
             Stressor stressor = Stressor.fromSpec(spec);
+            int rate = WorkerUtils.perSecToPerPeriod(
+                ((float) spec.targetConnectionsPerSec()) / spec.numThreads(),
+                THROTTLE_PERIOD_MS);
+            Throttle throttle = new ConnectStressThrottle(rate);
             try {
                 while (!doneFuture.isDone()) {
                     throttle.increment();
-                    long lastTimeMs = throttle.lastTimeMs();
                     boolean success = stressor.tryConnect();
                     synchronized (ConnectionStressWorker.this) {
                         totalConnections++;
                         if (!success) {
                             totalFailedConnections++;
                         }
-                        if (lastTimeMs > nextReportTime)
-                            updateStatus(lastTimeMs);
                     }
                 }
             } catch (Exception e) {
-                WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
+                WorkerUtils.abort(log, "ConnectLoop", e, doneFuture);
             } finally {
                 Utils.closeQuietly(stressor, "stressor");
             }
         }
+    }
 
+    private class StatusUpdater implements Runnable {
+        @Override
+        public void run() {
+            try {
+                long lastTimeMs = Time.SYSTEM.milliseconds();
+                JsonNode node = null;
+                synchronized (ConnectionStressWorker.this) {
+                    node = JsonUtil.JSON_SERDE.valueToTree(
+                        new StatusData(totalConnections, totalFailedConnections,
+                            (totalConnections * 1000.0) / (lastTimeMs - startTimeMs)));
+                }
+                status.update(node);
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+            }
+        }
     }
 
     public static class StatusData {
@@ -292,10 +303,19 @@ public class ConnectionStressWorker implements TaskWorker {
             throw new IllegalStateException("ConnectionStressWorker is not running.");
         }
         log.info("{}: Deactivating ConnectionStressWorker.", id);
+
+        // Shut down the periodic status updater and perform a final update on the
+        // statistics.  We want to do this first, before deactivating any threads.
+        // Otherwise, if some threads take a while to terminate, this could lead
+        // to a misleading rate getting reported.
+        this.statusUpdaterFuture.cancel(false);
+        this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS);
+        this.statusUpdaterExecutor = null;
+        new StatusUpdater().run();
+
         doneFuture.complete("");
         workerExecutor.shutdownNow();
         workerExecutor.awaitTermination(1, TimeUnit.DAYS);
-        updateStatus(throttle.lastTimeMs());
         this.workerExecutor = null;
         this.status = null;
     }