You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/10/29 19:57:09 UTC
kafka git commit: KAFKA-2663 KAFKA-2664;
Add quota-delay time to request processing time break-up;
avoid using copy-on-write map for metrics
Repository: kafka
Updated Branches:
refs/heads/trunk 1b5687b9e -> 217eb9044
KAFKA-2663 KAFKA-2664; Add quota-delay time to request processing time break-up; avoid using copy-on-write map for metrics
This has 2 fixes:
KAFKA-2664 - This patch changes the underlying map implementation of Metrics.java to a ConcurrentHashMap. Using a CopyOnWriteMap caused new metrics creation to get extremely slow when the existing corpus of metrics is large. Using a ConcurrentHashMap seems to speed up metric creation time significantly
KAFKA-2663 - Splitting out the throttleTime from the remote time. On throttled requests, the remote time went up artificially.
Some status on using a ConcurrentMap. Time to create :
- 100k sensors (1.5 seconds)
- 200k sensors (3 seconds)
- 400k sensors (9 seconds)
- 500k sensors (14 seconds)
Please refer this test (originally written by Joel) http://pastebin.com/LnKjbY9a
Author: Aditya Auradkar <aa...@linkedin.com>
Reviewers: Joel Koshy <jj...@gmail.com>
Closes #369 from auradkar/K-2664
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/217eb904
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/217eb904
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/217eb904
Branch: refs/heads/trunk
Commit: 217eb90444ef6a6aacf95997666768ee849e6769
Parents: 1b5687b
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Thu Oct 29 11:56:55 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu Oct 29 11:56:55 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/metrics/Metrics.java | 8 ++++----
.../main/scala/kafka/network/RequestChannel.scala | 18 +++++++++++++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++++++
3 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 6281640..fdb7dac 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -17,13 +17,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -109,9 +109,9 @@ public class Metrics implements Closeable {
*/
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
this.config = defaultConfig;
- this.sensors = new CopyOnWriteMap<>();
- this.metrics = new CopyOnWriteMap<>();
- this.childrenSensors = new CopyOnWriteMap<>();
+ this.sensors = new ConcurrentHashMap<>();
+ this.metrics = new ConcurrentHashMap<>();
+ this.childrenSensors = new ConcurrentHashMap<>();
this.reporters = Utils.notNull(reporters);
this.time = time;
for (MetricsReporter reporter : reporters)
http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 798e01d..6af62c7 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -53,6 +53,8 @@ object RequestChannel extends Logging {
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
@volatile var responseDequeueTimeMs = -1L
+ @volatile var apiRemoteCompleteTimeMs = -1L
+
val requestId = buffer.getShort()
// for server-side request / response format
// TODO: this will be removed once we migrated to client-side format
@@ -88,13 +90,20 @@ object RequestChannel extends Logging {
def updateRequestMetrics() {
val endTimeMs = SystemTime.milliseconds
- // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote
- // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
+ // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
+ // processing time is really small. This value is set in KafkaApis from a request handling thread.
+ // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
+ // see a negative value here. In that case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
if (apiLocalCompleteTimeMs < 0)
apiLocalCompleteTimeMs = responseCompleteTimeMs
+ // If the apiRemoteCompleteTimeMs is not set, then it is the same as responseCompleteTimeMs.
+ if (apiRemoteCompleteTimeMs < 0)
+ apiRemoteCompleteTimeMs = responseCompleteTimeMs
+
val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L)
val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L)
- val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
+ val apiRemoteTime = (apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
+ val apiThrottleTime = (responseCompleteTimeMs - apiRemoteCompleteTimeMs).max(0L)
val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)
val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
val totalTime = endTimeMs - startTimeMs
@@ -111,6 +120,7 @@ object RequestChannel extends Logging {
m.requestQueueTimeHist.update(requestQueueTime)
m.localTimeHist.update(apiLocalTime)
m.remoteTimeHist.update(apiRemoteTime)
+ m.throttleTimeHist.update(apiThrottleTime)
m.responseQueueTimeHist.update(responseQueueTime)
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
@@ -236,6 +246,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
// time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
+ // time a request is throttled (only relevant to fetch and produce requests)
+ val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
// time a response spent in a response queue
val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
// time to send the response to the requester
http://git-wip-us.apache.org/repos/asf/kafka/blob/217eb904/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index af6bb5e..dfcfa25 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -312,6 +312,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def produceResponseCallback(delayTimeMs: Int) {
+
if (produceRequest.requiredAcks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
@@ -336,6 +337,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+
quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
numBytesAppended,
produceResponseCallback)
@@ -397,6 +401,10 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
+
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+
// Do not throttle replication traffic
if (fetchRequest.isFromFollower) {
fetchResponseCallback(0)