You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/10/29 19:57:09 UTC

kafka git commit: KAFKA-2663 KAFKA-2664; Add quota-delay time to request processing time break-up; avoid using copy-on-write map for metrics

Repository: kafka
Updated Branches:
  refs/heads/trunk 1b5687b9e -> 217eb9044


KAFKA-2663 KAFKA-2664; Add quota-delay time to request processing time break-up; avoid using copy-on-write map for metrics

This has 2 fixes:
KAFKA-2664 - This patch changes the underlying map implementation of Metrics.java to a ConcurrentHashMap. Using a CopyOnWriteMap caused new metrics creation to get extremely slow when the existing corpus of metrics is large. Using a ConcurrentHashMap seems to speed up metric creation time significantly

KAFKA-2663 - Splitting out the throttleTime from the remote time. On throttled requests, the remote time went up artificially.

Some status on using a ConcurrentMap. Time to create :
- 100k sensors (1.5 seconds)
- 200k sensors (3 seconds)
- 400k sensors (9 seconds)
- 500k sensors (14 seconds)

Please refer this test (originally written by Joel) http://pastebin.com/LnKjbY9a

Author: Aditya Auradkar <aa...@linkedin.com>

Reviewers: Joel Koshy <jj...@gmail.com>

Closes #369 from auradkar/K-2664


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/217eb904
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/217eb904
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/217eb904

Branch: refs/heads/trunk
Commit: 217eb90444ef6a6aacf95997666768ee849e6769
Parents: 1b5687b
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Thu Oct 29 11:56:55 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Oct 29 11:56:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/metrics/Metrics.java  |  8 ++++----
 .../main/scala/kafka/network/RequestChannel.scala | 18 +++++++++++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala  |  8 ++++++++
 3 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 6281640..fdb7dac 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -17,13 +17,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -109,9 +109,9 @@ public class Metrics implements Closeable {
      */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
         this.config = defaultConfig;
-        this.sensors = new CopyOnWriteMap<>();
-        this.metrics = new CopyOnWriteMap<>();
-        this.childrenSensors = new CopyOnWriteMap<>();
+        this.sensors = new ConcurrentHashMap<>();
+        this.metrics = new ConcurrentHashMap<>();
+        this.childrenSensors = new ConcurrentHashMap<>();
         this.reporters = Utils.notNull(reporters);
         this.time = time;
         for (MetricsReporter reporter : reporters)

http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 798e01d..6af62c7 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -53,6 +53,8 @@ object RequestChannel extends Logging {
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
     @volatile var responseDequeueTimeMs = -1L
+    @volatile var apiRemoteCompleteTimeMs = -1L
+
     val requestId = buffer.getShort()
     // for server-side request / response format
     // TODO: this will be removed once we migrated to client-side format
@@ -88,13 +90,20 @@ object RequestChannel extends Logging {
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
-      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote
-      // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
+      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
+      // processing time is really small. This value is set in KafkaApis from a request handling thread.
+      // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
+      // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
       if (apiLocalCompleteTimeMs < 0)
         apiLocalCompleteTimeMs = responseCompleteTimeMs
+      // If the apiRemoteCompleteTimeMs is not set, then it is the same as responseCompleteTimeMs.
+      if (apiRemoteCompleteTimeMs < 0)
+        apiRemoteCompleteTimeMs = responseCompleteTimeMs
+
       val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L)
       val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L)
-      val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
+      val apiRemoteTime = (apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
+      val apiThrottleTime = (responseCompleteTimeMs - apiRemoteCompleteTimeMs).max(0L)
       val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)
       val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
       val totalTime = endTimeMs - startTimeMs
@@ -111,6 +120,7 @@ object RequestChannel extends Logging {
              m.requestQueueTimeHist.update(requestQueueTime)
              m.localTimeHist.update(apiLocalTime)
              m.remoteTimeHist.update(apiRemoteTime)
+             m.throttleTimeHist.update(apiThrottleTime)
              m.responseQueueTimeHist.update(responseQueueTime)
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
@@ -236,6 +246,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
   val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
   // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
   val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
+  // time a request is throttled (only relevant to fetch and produce requests)
+  val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
   // time a response spent in a response queue
   val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
   // time to send the response to the requester

http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index af6bb5e..dfcfa25 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -312,6 +312,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       def produceResponseCallback(delayTimeMs: Int) {
+
         if (produceRequest.requiredAcks == 0) {
           // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
           // the request, since no response is expected by the producer, the server will close socket server so that
@@ -336,6 +337,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+
       quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
                                                                    numBytesAppended,
                                                                    produceResponseCallback)
@@ -397,6 +401,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
 
+
+      // When this callback is triggered, the remote API call has completed
+      request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+
       // Do not throttle replication traffic
       if (fetchRequest.isFromFollower) {
         fetchResponseCallback(0)