You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/29 14:11:44 UTC
kafka git commit: KAFKA-5968;
Create/remove request metrics during broker startup/shutdown
Repository: kafka
Updated Branches:
refs/heads/trunk 3107a6c5c -> 0ef960304
KAFKA-5968; Create/remove request metrics during broker startup/shutdown
Replaces the static `RequestMetrics` object with a class so that metrics
are created and removed during broker startup and shutdown to avoid metrics
tests being affected by metrics left behind by previous tests.
Also reinstates `kafka.api.MetricsTest` which was failing frequently earlier
due to tests removing the static request metrics.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3991 from rajinisivaram/KAFKA-5968
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ef96030
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ef96030
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ef96030
Branch: refs/heads/trunk
Commit: 0ef960304b6b81c1b725bf8e036f20d295045c4a
Parents: 3107a6c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Fri Sep 29 15:09:55 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 29 15:11:29 2017 +0100
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 111 ++++++++++++-------
.../main/scala/kafka/network/SocketServer.scala | 3 +-
.../integration/kafka/api/MetricsTest.scala | 18 +--
.../unit/kafka/network/SocketServerTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
5 files changed, 89 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index e5f115c..c97e3af 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,11 +49,28 @@ object RequestChannel extends Logging {
val sanitizedUser = Sanitizer.sanitize(principal.getName)
}
+ class Metrics {
+
+ private val metricsMap = mutable.Map[String, RequestMetrics]()
+
+ (ApiKeys.values.toSeq.map(_.name) ++
+ Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
+ metricsMap.put(name, new RequestMetrics(name))
+ }
+
+ def apply(metricName: String) = metricsMap(metricName)
+
+ def shutdown(): Unit = {
+ metricsMap.values.foreach(_.removeMetrics())
+ }
+ }
+
class Request(val processor: Int,
val context: RequestContext,
val startTimeNanos: Long,
memoryPool: MemoryPool,
- @volatile private var buffer: ByteBuffer) extends BaseRequest {
+ @volatile private var buffer: ByteBuffer,
+ metrics: RequestChannel.Metrics) extends BaseRequest {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
@volatile var requestDequeueTimeNanos = -1L
@@ -137,7 +154,7 @@ object RequestChannel extends Logging {
else Seq.empty
val metricNames = fetchMetricNames :+ header.apiKey.name
metricNames.foreach { metricName =>
- val m = RequestMetrics(metricName)
+ val m = metrics(metricName)
m.requestRate.mark()
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -221,6 +238,7 @@ object RequestChannel extends Logging {
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
+ val metrics = new RequestChannel.Metrics
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
@@ -294,74 +312,67 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
errors.foreach { case (error, count) =>
- RequestMetrics.markErrorMeter(apiKey.name, error, count)
+ metrics(apiKey.name).markErrorMeter(error, count)
}
}
def shutdown() {
requestQueue.clear()
+ metrics.shutdown()
}
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
-
}
object RequestMetrics {
-
- private val metricsMap = mutable.Map[String, RequestMetrics]()
-
val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
- (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
- metricsMap.put(name, new RequestMetrics(name))
- }
-
- def apply(metricName: String) = metricsMap(metricName)
-
- def markErrorMeter(name: String, error: Errors, count: Int) {
- val errorMeter = metricsMap(name).errorMeters(error)
- errorMeter.getOrCreateMeter().mark(count.toLong)
- }
-
- // Used for testing until these metrics are moved to a class
- private[kafka] def clearErrorMeters(): Unit = {
- metricsMap.values.foreach { requestMetrics =>
- requestMetrics.errorMeters.values.foreach(_.removeMeter())
- }
- }
-
+ val RequestsPerSec = "RequestsPerSec"
+ val RequestQueueTimeMs = "RequestQueueTimeMs"
+ val LocalTimeMs = "LocalTimeMs"
+ val RemoteTimeMs = "RemoteTimeMs"
+ val ThrottleTimeMs = "ThrottleTimeMs"
+ val ResponseQueueTimeMs = "ResponseQueueTimeMs"
+ val ResponseSendTimeMs = "ResponseSendTimeMs"
+ val TotalTimeMs = "TotalTimeMs"
+ val RequestBytes = "RequestBytes"
+ val MessageConversionsTimeMs = "MessageConversionsTimeMs"
+ val TemporaryMemoryBytes = "TemporaryMemoryBytes"
+ val ErrorsPerSec = "ErrorsPerSec"
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
+ import RequestMetrics._
+
val tags = Map("request" -> name)
- val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+ val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
// time a request spent in a request queue
- val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
+ val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
// time a request takes to be processed at the local broker
- val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
+ val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
// time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
- val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
+ val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
// time a request is throttled
- val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
+ val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
// time a response spent in a response queue
- val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
+ val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags)
// time to send the response to the requester
- val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
- val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+ val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags)
+ val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
// request size in bytes
- val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+ val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
// time for message conversions (only relevant to fetch and produce requests)
val messageConversionsTimeHist =
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
- Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+ Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
else
None
// Temporary memory allocated for processing request (only populated for fetch and produce requests)
// This shows the memory allocated for compression/conversions excluding the actual request size
val tempMemoryBytesHist =
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
- Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+ Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
else
None
@@ -379,21 +390,43 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
else {
synchronized {
if (meter == null)
- meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+ meter = newMeter(ErrorsPerSec, "requests", TimeUnit.SECONDS, tags)
meter
}
}
}
- // This is currently used only in tests.
def removeMeter(): Unit = {
synchronized {
if (meter != null) {
- removeMetric("ErrorsPerSec", tags)
+ removeMetric(ErrorsPerSec, tags)
meter = null
}
}
}
+ }
+ def markErrorMeter(error: Errors, count: Int) {
+ errorMeters(error).getOrCreateMeter().mark(count.toLong)
+ }
+
+ def removeMetrics(): Unit = {
+ removeMetric(RequestsPerSec, tags)
+ removeMetric(RequestQueueTimeMs, tags)
+ removeMetric(LocalTimeMs, tags)
+ removeMetric(RemoteTimeMs, tags)
+ removeMetric(RequestsPerSec, tags)
+ removeMetric(ThrottleTimeMs, tags)
+ removeMetric(ResponseQueueTimeMs, tags)
+ removeMetric(TotalTimeMs, tags)
+ removeMetric(ResponseSendTimeMs, tags)
+ removeMetric(RequestBytes, tags)
+ removeMetric(ResponseSendTimeMs, tags)
+ if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) {
+ removeMetric(MessageConversionsTimeMs, tags)
+ removeMetric(TemporaryMemoryBytes, tags)
+ }
+ errorMeters.values.foreach(_.removeMeter())
+ errorMeters.clear()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 875652d..fa792fb 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -139,6 +139,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
this.synchronized {
acceptors.values.foreach(_.shutdown)
processors.foreach(_.shutdown)
+ requestChannel.shutdown()
}
info("Shutdown completed")
}
@@ -549,7 +550,7 @@ private[kafka] class Processor(val id: Int,
val context = new RequestContext(header, receive.source, channel.socketAddress,
channel.principal, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
- startTimeNanos = time.nanoseconds, memoryPool, receive.payload)
+ startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)
selector.mute(receive.source)
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 0bd5075..f1bedfd 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Before, Test}
import org.junit.Assert._
import scala.collection.JavaConverters._
@@ -52,6 +52,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
@Before
override def setUp(): Unit = {
+ verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp()
}
@@ -60,13 +61,13 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
override def tearDown(): Unit = {
super.tearDown()
closeSasl()
+ verifyNoRequestMetrics("Request metrics not removed in this test")
}
/**
* Verifies some of the metrics of producer, consumer as well as server.
*/
@Test
- @Ignore
def testMetrics(): Unit = {
val topic = "topicWithOldMessageFormat"
val props = new Properties
@@ -74,9 +75,6 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
val tp = new TopicPartition(topic, 0)
- // Clear static state
- RequestMetrics.clearErrorMeters()
-
// Produce and consume some records
val numRecords = 10
val recordSize = 1000
@@ -201,8 +199,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
- // Temporary size for fetch should be zero after KAFKA-5968 is fixed
- verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value >= 0.0)
+ verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0)
// request size recorded for all request types, check one
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
@@ -285,4 +282,11 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
assertTrue(s"Broker metric not recorded correctly for $name value $metricValue", verify(metricValue))
metricValue
}
+
+ private def verifyNoRequestMetrics(errorMessage: String): Unit = {
+ val metrics = Metrics.defaultRegistry.allMetrics.asScala.filter { case (n, _) =>
+ n.getMBeanName.startsWith("kafka.network:type=RequestMetrics")
+ }
+ assertTrue(s"$errorMessage: ${metrics.keys}", metrics.isEmpty)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 077950d..46eb4ce 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite {
val channel = overrideServer.requestChannel
val request = receiveRequest(channel)
- val requestMetrics = RequestMetrics(request.header.apiKey.name)
+ val requestMetrics = channel.metrics(request.header.apiKey.name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1
@@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
s"Idle connection `${request.context.connectionId}` was not closed by selector")
- val requestMetrics = RequestMetrics(request.header.apiKey.name)
+ val requestMetrics = channel.metrics(request.header.apiKey.name)
def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
val expectedTotalTimeCount = totalTimeHistCount() + 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fde2ae1..c3f492d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -54,6 +54,7 @@ import scala.collection.Map
class KafkaApisTest {
private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+ private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
@@ -402,7 +403,7 @@ class KafkaApisTest {
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
new ListenerName(""), SecurityProtocol.PLAINTEXT)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0,
- MemoryPool.NONE, buffer))
+ MemoryPool.NONE, buffer, requestChannelMetrics))
}
private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {