You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/10/05 16:25:51 UTC
kafka git commit: KAFKA-6012;
Close request metrics only after closing request handlers
Repository: kafka
Updated Branches:
refs/heads/trunk e61002e2a -> e40b3a2e7
KAFKA-6012; Close request metrics only after closing request handlers
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #4024 from rajinisivaram/KAFKA-6012-error-metric
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e40b3a2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e40b3a2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e40b3a2e
Branch: refs/heads/trunk
Commit: e40b3a2e74133de6d60599beefb65407ca4cc7dd
Parents: e61002e
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Oct 5 12:25:34 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Oct 5 12:25:34 2017 -0400
----------------------------------------------------------------------
.../scala/kafka/network/RequestChannel.scala | 3 +-
.../kafka/server/KafkaRequestHandler.scala | 1 +
.../unit/kafka/network/SocketServerTest.scala | 55 +++++++++++++-------
3 files changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e40b3a2e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index c97e3af..ec16ab0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -60,7 +60,7 @@ object RequestChannel extends Logging {
def apply(metricName: String) = metricsMap(metricName)
- def shutdown(): Unit = {
+ def close(): Unit = {
metricsMap.values.foreach(_.removeMetrics())
}
}
@@ -318,7 +318,6 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def shutdown() {
requestQueue.clear()
- metrics.shutdown()
}
def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e40b3a2e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index a498781..3d8dbd9 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,6 +105,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
+ requestChannel.metrics.close()
info("shut down completely")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e40b3a2e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 633138b..aebbf5c 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -24,7 +24,7 @@ import java.nio.channels.SocketChannel
import java.util.{HashMap, Random}
import javax.net.ssl._
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Meter}
import com.yammer.metrics.{Metrics => YammerMetrics}
import kafka.network.RequestChannel.SendAction
import kafka.security.CredentialProvider
@@ -34,7 +34,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send}
-import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@@ -133,10 +133,15 @@ class SocketServerTest extends JUnitSuite {
receiveRequest(server.requestChannel)
}
+ def shutdownServerAndMetrics(server: SocketServer): Unit = {
+ server.shutdown()
+ server.metrics.close()
+ server.requestChannel.metrics.close()
+ }
+
@After
def tearDown() {
- metrics.close()
- server.shutdown()
+ shutdownServerAndMetrics(server)
sockets.foreach(_.close())
sockets.clear()
}
@@ -260,8 +265,7 @@ class SocketServerTest extends JUnitSuite {
assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200))
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
}
@@ -342,8 +346,7 @@ class SocketServerTest extends JUnitSuite {
newChannel.disconnect()
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
}
@@ -380,7 +383,7 @@ class SocketServerTest extends JUnitSuite {
// make sure the sockets are open
server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
// then shutdown the server
- server.shutdown()
+ shutdownServerAndMetrics(server)
val largeChunkOfBytes = new Array[Byte](1000000)
// doing a subsequent send should throw an exception as the connection should be closed.
@@ -438,8 +441,7 @@ class SocketServerTest extends JUnitSuite {
conn.setSoTimeout(3000)
assertEquals(-1, conn.getInputStream.read())
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
}
@@ -479,8 +481,7 @@ class SocketServerTest extends JUnitSuite {
assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
sslSocket.close()
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
}
@@ -534,8 +535,7 @@ class SocketServerTest extends JUnitSuite {
s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
}
@@ -571,10 +571,28 @@ class SocketServerTest extends JUnitSuite {
s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
} finally {
- overrideServer.shutdown()
- serverMetrics.close()
+ shutdownServerAndMetrics(overrideServer)
}
+ }
+
+ @Test
+ def testRequestMetricsAfterShutdown(): Unit = {
+ server.shutdown()
+
+ server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
+ server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
+ val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 1,
+ "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
+
+ def requestMetricMeters = YammerMetrics
+ .defaultRegistry
+ .allMetrics.asScala
+ .filterKeys(k => k.getType == "RequestMetrics")
+ .collect { case (k, metric: Meter) => (k.toString, metric.count) }
+ assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 })
+ server.requestChannel.metrics.close()
+ assertEquals(Map.empty, requestMetricMeters)
}
@Test
@@ -844,8 +862,7 @@ class SocketServerTest extends JUnitSuite {
try {
testWithServer(testableServer)
} finally {
- testableServer.shutdown()
- testableServer.metrics.close()
+ shutdownServerAndMetrics(testableServer)
}
}