You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/18 03:35:57 UTC
[1/2] kafka git commit: kafka-1481;
Stop using dashes AND underscores as separators in MBean names;
patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.8.2 7e1907e32 -> 117a02de0
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
new file mode 100644
index 0000000..3cf23b3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import com.yammer.metrics.Metrics
+import junit.framework.Assert._
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import scala.collection._
+import org.scalatest.junit.JUnit3Suite
+import kafka.message._
+import kafka.serializer._
+import kafka.utils._
+import kafka.utils.TestUtils._
+
+class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+ val zookeeperConnect = TestZKUtils.zookeeperConnect
+ val numNodes = 2
+ val numParts = 2
+ val topic = "topic1"
+ val configs =
+ for (props <- TestUtils.createBrokerConfigs(numNodes))
+ yield new KafkaConfig(props) {
+ override val zkConnect = zookeeperConnect
+ override val numPartitions = numParts
+ }
+ val nMessages = 2
+
+ override def tearDown() {
+ super.tearDown()
+ }
+
+ def testMetricsLeak() {
+ // create topic topic1 with 1 partition on broker 0
+ createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+ // force creation not client's specific metrics.
+ createAndShutdownStep("group0", "consumer0", "producer0")
+
+ val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+
+ for (i <- 0 to 5) {
+ createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
+ assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+ }
+ }
+
+ def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
+ val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
+ // create a consumer
+ val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+ val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+
+ zkConsumerConnector1.shutdown()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd3640f..0da774d 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,7 +26,6 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils._
-import collection.mutable.Map
import collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
@@ -36,7 +35,7 @@ import kafka.producer._
import kafka.message._
import kafka.api._
import kafka.cluster.Broker
-import kafka.consumer.ConsumerConfig
+import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
@@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError
import junit.framework.Assert._
import org.apache.kafka.clients.producer.KafkaProducer
+import scala.collection.Map
+
/**
* Utility functions to help with testing
*/
@@ -483,7 +484,7 @@ object TestUtils extends Logging {
val data = topics.flatMap(topic =>
partitions.map(partition => (TopicAndPartition(topic, partition), message))
)
- new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
+ new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
}
def makeLeaderForPartition(zkClient: ZkClient, topic: String,
@@ -720,6 +721,73 @@ object TestUtils extends Logging {
time = time,
brokerState = new BrokerState())
}
+
+ def sendMessagesToPartition(configs: Seq[KafkaConfig],
+ topic: String,
+ partition: Int,
+ numMessages: Int,
+ compression: CompressionCodec = NoCompressionCodec): List[String] = {
+ val header = "test-%d".format(partition)
+ val props = new Properties()
+ props.put("compression.codec", compression.codec.toString)
+ val producer: Producer[Int, String] =
+ createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName,
+ partitioner = classOf[FixedValuePartitioner].getName,
+ producerProps = props)
+
+ val ms = 0.until(numMessages).map(x => header + "-" + x)
+ producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+ debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+ producer.close()
+ ms.toList
+ }
+
+ def sendMessages(configs: Seq[KafkaConfig],
+ topic: String,
+ producerId: String,
+ messagesPerNode: Int,
+ header: String,
+ compression: CompressionCodec,
+ numParts: Int): List[String]= {
+ var messages: List[String] = Nil
+ val props = new Properties()
+ props.put("compression.codec", compression.codec.toString)
+ props.put("client.id", producerId)
+ val producer: Producer[Int, String] =
+ createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+ encoder = classOf[StringEncoder].getName,
+ keyEncoder = classOf[IntEncoder].getName,
+ partitioner = classOf[FixedValuePartitioner].getName,
+ producerProps = props)
+
+ for (partition <- 0 until numParts) {
+ val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
+ producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+ messages ++= ms
+ debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+ }
+ producer.close()
+ messages
+ }
+
+ def getMessages(nMessagesPerThread: Int,
+ topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = {
+ var messages: List[String] = Nil
+ for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStream <- messageStreams) {
+ val iterator = messageStream.iterator
+ for (i <- 0 until nMessagesPerThread) {
+ assertTrue(iterator.hasNext)
+ val message = iterator.next.message
+ messages ::= message
+ debug("received message: " + message)
+ }
+ }
+ }
+ messages.reverse
+ }
}
object TestZKUtils {
[2/2] kafka git commit: kafka-1481;
Stop using dashes AND underscores as separators in MBean names;
patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao
Posted by ju...@apache.org.
kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/117a02de
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/117a02de
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/117a02de
Branch: refs/heads/0.8.2
Commit: 117a02de06bab60e179345d6cd82d56e5b725189
Parents: 7e1907e
Author: Vladimir Tretyakov <vl...@sematext.com>
Authored: Mon Nov 17 18:35:45 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 17 18:35:45 2014 -0800
----------------------------------------------------------------------
build.gradle | 6 +
.../main/scala/kafka/cluster/Partition.scala | 13 +-
core/src/main/scala/kafka/common/AppInfo.scala | 66 +++++++++
.../scala/kafka/common/ClientIdAndBroker.scala | 14 +-
.../scala/kafka/common/ClientIdAndTopic.scala | 14 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 2 +-
.../kafka/consumer/ConsumerTopicStats.scala | 17 ++-
.../consumer/FetchRequestAndResponseStats.scala | 28 ++--
.../scala/kafka/consumer/SimpleConsumer.scala | 5 +-
.../consumer/ZookeeperConsumerConnector.scala | 37 ++---
core/src/main/scala/kafka/log/Log.scala | 38 ++++--
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 135 +++++++++++--------
.../scala/kafka/network/RequestChannel.scala | 27 ++--
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../main/scala/kafka/producer/Producer.scala | 3 +-
.../kafka/producer/ProducerRequestStats.scala | 23 ++--
.../scala/kafka/producer/ProducerStats.scala | 7 +-
.../kafka/producer/ProducerTopicStats.scala | 23 ++--
.../scala/kafka/producer/SyncProducer.scala | 12 +-
.../producer/async/ProducerSendThread.scala | 5 +-
.../kafka/server/AbstractFetcherManager.scala | 31 ++---
.../kafka/server/AbstractFetcherThread.scala | 36 ++---
.../main/scala/kafka/server/DelayedFetch.scala | 2 +-
.../scala/kafka/server/DelayedProduce.scala | 2 +-
.../scala/kafka/server/DelayedRequestKey.scala | 38 ------
.../src/main/scala/kafka/server/KafkaApis.scala | 7 +-
.../kafka/server/KafkaRequestHandler.scala | 25 ++--
.../main/scala/kafka/server/KafkaServer.scala | 1 -
.../kafka/server/KafkaServerStartable.scala | 2 +
.../kafka/server/ProducerRequestPurgatory.scala | 22 +--
.../scala/kafka/server/ReplicaManager.scala | 8 +-
.../scala/kafka/tools/ProducerPerformance.scala | 1 -
.../ZookeeperConsumerConnectorTest.scala | 101 +++-----------
.../scala/unit/kafka/metrics/MetricsTest.scala | 72 ++++++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 74 +++++++++-
35 files changed, 555 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 17b9969..665771b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -263,6 +263,12 @@ project(':core') {
dependsOn 'copyDependantLibs'
}
+ jar.manifest {
+ attributes(
+ 'Version': "${version}"
+ )
+ }
+
task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e88ecf2..419d336 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
-import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
+import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@@ -29,7 +29,6 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock}
-import scala.Some
import scala.collection.immutable.Set
import com.yammer.metrics.core.Gauge
@@ -62,13 +61,13 @@ class Partition(val topic: String,
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
- newGauge(
- topic + "-" + partitionId + "-UnderReplicated",
+ newGauge("UnderReplicated",
new Gauge[Int] {
def value = {
if (isUnderReplicated) 1 else 0
}
- }
+ },
+ Map("topic" -> topic, "partition" -> partitionId.toString)
)
def isUnderReplicated(): Boolean = {
@@ -309,7 +308,7 @@ class Partition(val topic: String,
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
// some delayed requests may be unblocked after HW changed
- val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId)
+ val requestKey = new TopicAndPartition(this.topic, this.partitionId)
replicaManager.unblockDelayedFetchRequests(requestKey)
replicaManager.unblockDelayedProduceRequests(requestKey)
} else {
@@ -379,7 +378,7 @@ class Partition(val topic: String,
val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
- replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId))
+ replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/AppInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala
new file mode 100644
index 0000000..d642ca5
--- /dev/null
+++ b/core/src/main/scala/kafka/common/AppInfo.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.net.URL
+import java.util.jar.{Attributes, Manifest}
+
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+
+object AppInfo extends KafkaMetricsGroup {
+ private var isRegistered = false
+ private val lock = new Object()
+
+ def registerInfo(): Unit = {
+ lock.synchronized {
+ if (isRegistered) {
+ return
+ }
+ }
+
+ try {
+ val clazz = AppInfo.getClass
+ val className = clazz.getSimpleName + ".class"
+ val classPath = clazz.getResource(className).toString
+ if (!classPath.startsWith("jar")) {
+ // Class not from JAR
+ return
+ }
+ val manifestPath = classPath.substring(0, classPath.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF"
+
+ val mf = new Manifest
+ mf.read(new URL(manifestPath).openStream())
+ val version = mf.getMainAttributes.get(new Attributes.Name("Version")).toString
+
+ newGauge("Version",
+ new Gauge[String] {
+ def value = {
+ version
+ }
+ })
+
+ lock.synchronized {
+ isRegistered = true
+ }
+ } catch {
+ case e: Exception =>
+ warn("Can't read Kafka version from MANIFEST.MF. Possible cause: %s".format(e))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
index 93223a9..3b09041 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala
@@ -8,7 +8,7 @@ package kafka.common
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,14 @@ package kafka.common
* Convenience case class since (clientId, brokerInfo) pairs are used to create
* SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
*/
-case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
- override def toString = "%s-%s".format(clientId, brokerInfo)
+
+trait ClientIdBroker {
+}
+
+case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker {
+ override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort)
+}
+
+case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker {
+ override def toString = "%s-%s".format(clientId, "AllBrokers")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
index 7acf9e7..5825aad 100644
--- a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
+++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,11 +15,21 @@ package kafka.common
* limitations under the License.
*/
+package kafka.common
+
/**
* Convenience case class since (clientId, topic) pairs are used in the creation
* of many Stats objects.
*/
-case class ClientIdAndTopic(clientId: String, topic: String) {
+trait ClientIdTopic {
+}
+
+case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
override def toString = "%s-%s".format(clientId, topic)
}
+case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
+ override def toString = "%s-%s".format(clientId, "AllTopics")
+}
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index f8c1b4e..ee6139c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
- clientId = config.clientId + "-" + name,
+ clientId = config.clientId,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketReceiveBufferBytes,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index f63e6c5..01797ff 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -20,12 +20,17 @@ package kafka.consumer
import kafka.utils.{Pool, threadsafe, Logging}
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
@threadsafe
-class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+ val tags = metricId match {
+ case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
+ case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
+ }
+
+ val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+ val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
}
/**
@@ -35,12 +40,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
- private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+ private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 5243f41..3df55e1 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -19,13 +19,21 @@ package kafka.consumer
import java.util.concurrent.TimeUnit
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.Pool
-class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(metricId + "FetchResponseSize")
+class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+ val tags = metricId match {
+ case ClientIdAndBroker(clientId, brokerHost, brokerPort) =>
+ Map("clientId" -> clientId, "brokerHost" -> brokerHost,
+ "brokerPort" -> brokerPort.toString)
+ case ClientIdAllBrokers(clientId) =>
+ Map("clientId" -> clientId)
+ }
+
+ val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+ val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
}
/**
@@ -33,14 +41,14 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
* @param clientId ClientId of the given consumer
*/
class FetchRequestAndResponseStats(clientId: String) {
- private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k)
- private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
- private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+ private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
+ private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
+ private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAllBrokers(clientId))
def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
- def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = {
- stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+ def getFetchRequestAndResponseStats(brokerHost: String, brokerPort: Int): FetchRequestAndResponseMetrics = {
+ stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
}
}
@@ -56,7 +64,7 @@ object FetchRequestAndResponseStatsRegistry {
}
def removeConsumerFetchRequestAndResponseStats(clientId: String) {
- val pattern = (clientId + "-ConsumerFetcherThread.*").r
+ val pattern = (".*" + clientId + ".*").r
val keys = globalStats.keys
for (key <- keys) {
pattern.findFirstIn(key) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index d349a30..e53ee51 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -36,7 +36,6 @@ class SimpleConsumer(val host: String,
ConsumerConfig.validateClientId(clientId)
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
- val brokerInfo = "host_%s-port_%s".format(host, port)
private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
private var isClosed = false
@@ -106,7 +105,7 @@ class SimpleConsumer(val host: String,
*/
def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null
- val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer
+ val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
aggregateTimer.time {
specificTimer.time {
@@ -115,7 +114,7 @@ class SimpleConsumer(val host: String,
}
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
- fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize)
+ fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
fetchResponse
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f476973..fe9d8e0 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
// useful for tracking migration of consumers to store offsets in kafka
- private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
- private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
- private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
+ private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
+ private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
val consumerIdString = {
var consumerUuid : String = null
@@ -138,6 +138,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
KafkaMetricsReporter.startReporters(config.props)
+ AppInfo.registerInfo()
def this(config: ConsumerConfig) = this(config, true)
@@ -516,14 +517,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
-
+
@volatile private var allTopicsOwnedPartitionsCount = 0
- newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
- def value() = allTopicsOwnedPartitionsCount
- })
+ newGauge("OwnedPartitionsCount",
+ new Gauge[Int] {
+ def value() = allTopicsOwnedPartitionsCount
+ },
+ Map("clientId" -> config.clientId, "groupId" -> config.groupId))
- private def ownedPartitionsCountMetricName(topic: String) =
- "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic)
+ private def ownedPartitionsCountMetricTags(topic: String) = Map("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic)
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
@@ -576,7 +578,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for(partition <- infos.keys) {
deletePartitionOwnershipFromZK(topic, partition)
}
- removeMetric(ownedPartitionsCountMetricName(topic))
+ removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricTags(topic))
localTopicRegistry.remove(topic)
}
allTopicsOwnedPartitionsCount = 0
@@ -679,9 +681,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) =>
- newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] {
- def value() = partitionThreadPairs.size
- })
+ newGauge("OwnedPartitionsCount",
+ new Gauge[Int] {
+ def value() = partitionThreadPairs.size
+ },
+ ownedPartitionsCountMetricTags(topic))
}
topicRegistry = currentTopicRegistry
@@ -863,10 +867,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicThreadIdAndQueues.put(topicThreadId, q)
debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
newGauge(
- config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+ "FetchQueueSize",
new Gauge[Int] {
def value = q.size
- }
+ },
+ Map("clientId" -> config.clientId,
+ "topic" -> topicThreadId._1,
+ "threadId" -> topicThreadId._2.threadId.toString)
)
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 157d673..ec19215 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -73,17 +73,31 @@ class Log(val dir: File,
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
- newGauge(name + "-" + "NumLogSegments",
- new Gauge[Int] { def value = numberOfSegments })
-
- newGauge(name + "-" + "LogStartOffset",
- new Gauge[Long] { def value = logStartOffset })
-
- newGauge(name + "-" + "LogEndOffset",
- new Gauge[Long] { def value = logEndOffset })
-
- newGauge(name + "-" + "Size",
- new Gauge[Long] {def value = size})
+ val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+
+ newGauge("NumLogSegments",
+ new Gauge[Int] {
+ def value = numberOfSegments
+ },
+ tags)
+
+ newGauge("LogStartOffset",
+ new Gauge[Long] {
+ def value = logStartOffset
+ },
+ tags)
+
+ newGauge("LogEndOffset",
+ new Gauge[Long] {
+ def value = logEndOffset
+ },
+ tags)
+
+ newGauge("Size",
+ new Gauge[Long] {
+ def value = size
+ },
+ tags)
/** The name of this log */
def name = dir.getName()
@@ -153,7 +167,7 @@ class Log(val dir: File,
if(logSegments.size == 0) {
// no existing segments, create a new mutable segment beginning at offset 0
- segments.put(0, new LogSegment(dir = dir,
+ segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 2313a57..e9e4918 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -35,29 +35,52 @@ trait KafkaMetricsGroup extends Logging {
* Creates a new MetricName object for gauges, meters, etc. created for this
* metrics group.
* @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object.
*/
- private def metricName(name: String) = {
+ private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
- new MetricName(pkg, simpleName, name)
+
+ explicitMetricName(pkg, simpleName, name, tags)
}
- def newGauge[T](name: String, metric: Gauge[T]) =
- Metrics.defaultRegistry().newGauge(metricName(name), metric)
- def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
- Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit)
+ private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
+ val nameBuilder: StringBuilder = new StringBuilder
+
+ nameBuilder.append(group)
+
+ nameBuilder.append(":type=")
+
+ nameBuilder.append(typeName)
- def newHistogram(name: String, biased: Boolean = true) =
- Metrics.defaultRegistry().newHistogram(metricName(name), biased)
+ if (name.length > 0) {
+ nameBuilder.append(",name=")
+ nameBuilder.append(name)
+ }
+
+ KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
+
+ new MetricName(group, typeName, name, null, nameBuilder.toString())
+ }
- def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
- Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
+ def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
+ Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
+
+ def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+ Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
+
+ def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
+ Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
+
+ def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
+ Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
+
+ def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
+ Metrics.defaultRegistry().removeMetric(metricName(name, tags))
- def removeMetric(name: String) =
- Metrics.defaultRegistry().removeMetric(metricName(name))
}
@@ -68,72 +91,75 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
*/
private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
// kafka.consumer.ZookeeperConsumerConnector
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"),
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"),
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"),
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"),
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"),
- new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
+ new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
// kafka.consumer.ConsumerFetcherManager
- new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"),
- new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"),
+ new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
+ new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
- new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"),
+ new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
// kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
- new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"),
- new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
// kafka.consumer.ConsumerTopicStats
- new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"),
- new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
+ new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
- new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"),
- new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"),
+ new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
+ new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
// kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
- new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"),
- new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"),
- new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
- new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
+ new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
/**
* ProducerRequestStats <-- SyncProducer
* metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
*/
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
)
- private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] (
+ private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
// kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
- new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"),
- new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"),
- new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"),
+ new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
+ new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
+ new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
// kafka.producer.ProducerSendThread
- new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"),
+ new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
// kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"),
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"),
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"),
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
- new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
+ new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
// kafka.producer.ProducerRequestStats <-- SyncProducer
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
- new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
+ new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
)
+ private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
+ val filteredTags = tags
+ .filter { case (tagKey, tagValue) => tagValue != ""}
+ if (filteredTags.nonEmpty) {
+ val tagsString = filteredTags
+ .map { case (key, value) => "%s=%s".format(key, value)}
+ .mkString(",")
+
+ Some(tagsString)
+ }
+ else {
+ None
+ }
+ }
+
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
@@ -150,18 +176,19 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => {
- val pattern = (clientId + ".*" + metric.getName +".*").r
+ val pattern = (".*clientId=" + clientId + ".*").r
val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
for (registeredMetric <- registeredMetrics) {
if (registeredMetric.getGroup == metric.getGroup &&
+ registeredMetric.getName == metric.getName &&
registeredMetric.getType == metric.getType) {
- pattern.findFirstIn(registeredMetric.getName) match {
+ pattern.findFirstIn(registeredMetric.getMBeanName) match {
case Some(_) => {
val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
Metrics.defaultRegistry().removeMetric(registeredMetric)
val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
- registeredMetric, beforeRemovalSize, afterRemovalSize))
+ registeredMetric, beforeRemovalSize, afterRemovalSize))
}
case _ =>
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4560d8f..7b1db3d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -125,12 +125,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
})
- for(i <- 0 until numProcessors) {
- newGauge(
- "Processor-" + i + "-ResponseQueueSize",
+ for (i <- 0 until numProcessors) {
+ newGauge("ResponseQueueSize",
new Gauge[Int] {
def value = responseQueues(i).size()
- }
+ },
+ Map("processor" -> i.toString)
)
}
@@ -187,24 +187,25 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
object RequestMetrics {
val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
- val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
- val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+ val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
+ val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
(RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
- val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+ val tags = Map("request" -> name)
+ val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
// time a request spent in a request queue
- val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
+ val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
// time a request takes to be processed at the local broker
- val localTimeHist = newHistogram(name + "-LocalTimeMs")
+ val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
// time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
- val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
+ val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
// time a response spent in a response queue
- val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
+ val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
// time to send the response to the requester
- val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
- val totalTimeHist = newHistogram(name + "-TotalTimeMs")
+ val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
+ val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index cee76b3..e451592 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int,
time,
maxRequestSize,
aggregateIdleMeter,
- newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
+ newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index cd634f6..e38d2fa 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -19,7 +19,7 @@ package kafka.producer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-import kafka.common.QueueFullException
+import kafka.common.{AppInfo, QueueFullException}
import kafka.metrics._
import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
import kafka.serializer.Encoder
@@ -53,6 +53,7 @@ class Producer[K,V](val config: ProducerConfig,
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
KafkaMetricsReporter.startReporters(config.props)
+ AppInfo.registerInfo()
def this(config: ProducerConfig) =
this(config,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 1c46d72..026e93a 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -19,11 +19,16 @@ package kafka.producer
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import java.util.concurrent.TimeUnit
import kafka.utils.Pool
-import kafka.common.ClientIdAndBroker
+import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
-class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
- val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
- val requestSizeHist = newHistogram(metricId + "ProducerRequestSize")
+class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
+ val tags = metricId match {
+ case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
+ case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId)
+ }
+
+ val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
+ val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
}
/**
@@ -31,14 +36,14 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
* @param clientId ClientId of the given producer
*/
class ProducerRequestStats(clientId: String) {
- private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k)
- private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory))
- private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers"))
+ private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
+ private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
+ private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
- def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = {
- stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-"))
+ def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
+ stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala
index 35e3aae..1d0fa88 100644
--- a/core/src/main/scala/kafka/producer/ProducerStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerStats.scala
@@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit
import kafka.utils.Pool
class ProducerStats(clientId: String) extends KafkaMetricsGroup {
- val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
- val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS)
- val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
+ val tags: Map[String, String] = Map("clientId" -> clientId)
+ val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
+ val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
+ val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
index 9bb1419..97594c8 100644
--- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala
@@ -17,16 +17,21 @@
package kafka.producer
import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ClientIdAndTopic
+import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
import kafka.utils.{Pool, threadsafe}
import java.util.concurrent.TimeUnit
@threadsafe
-class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup {
- val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS)
- val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
+class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
+ val tags = metricId match {
+ case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
+ case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
+ }
+
+ val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
+ val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
+ val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
}
/**
@@ -34,14 +39,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
* @param clientId The clientId of the given producer client.
*/
class ProducerTopicStats(clientId: String) {
- private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k)
- private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory))
- private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics
+ private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
+ private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
+ private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
- stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-"))
+ stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 35e9e8c..0f09951 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.sendBufferBytes, config.requestTimeoutMs)
- val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
@@ -93,11 +92,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
val requestSize = producerRequest.sizeInBytes
- producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize)
+ producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
var response: Receive = null
- val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer
+ val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
aggregateTimer.time {
specificTimer.time {
@@ -134,7 +133,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
case e: Exception => error("Error on disconnect: ", e)
}
}
-
+
private def connect(): BlockingChannel = {
if (!blockingChannel.isConnected && !shutdown) {
try {
@@ -156,5 +155,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
connect()
}
}
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 42e9c74..2ccf82a 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -34,10 +34,11 @@ class ProducerSendThread[K,V](val threadName: String,
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
- newGauge(clientId + "-ProducerQueueSize",
+ newGauge("ProducerQueueSize",
new Gauge[Int] {
def value = queue.size
- })
+ },
+ Map("clientId" -> clientId))
override def run {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..20c00cb 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import com.yammer.metrics.core.Gauge
-abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1)
+abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
extends Logging with KafkaMetricsGroup {
// map of (source broker_id, fetcher_id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
@@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
this.logIdent = "[" + name + "] "
newGauge(
- metricPrefix + "-MaxLag",
+ "MaxLag",
new Gauge[Long] {
// current max lag across all fetchers/topics/partitions
def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@@ -42,24 +42,25 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
curMaxThread.max(fetcherLagStatsEntry._2.lag)
}).max(curMaxAll)
})
- }
+ },
+ Map("clientId" -> clientId)
)
newGauge(
- metricPrefix + "-MinFetchRate",
- {
- new Gauge[Double] {
- // current min fetch rate across all fetchers/topics/partitions
- def value = {
- val headRate: Double =
- fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
-
- fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
- fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
- })
- }
+ "MinFetchRate", {
+ new Gauge[Double] {
+ // current min fetch rate across all fetchers/topics/partitions
+ def value = {
+ val headRate: Double =
+ fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+ fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
+ fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+ })
}
}
+ },
+ Map("clientId" -> clientId)
)
private def getFetcherId(topic: String, partitionId: Int) : Int = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2e9532e..8c281d4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -26,9 +26,7 @@ import kafka.utils.Utils.inLock
import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
import kafka.metrics.KafkaMetricsGroup
-import scala.collection.mutable
-import scala.collection.Set
-import scala.collection.Map
+import scala.collection.{mutable, Set, Map}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
@@ -46,8 +44,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
- private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
- private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
+ private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)
val fetchRequestBuilder = new FetchRequestBuilder().
@@ -204,13 +201,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
}
-class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
private[this] val lagVal = new AtomicLong(-1L)
- newGauge(
- metricId + "-ConsumerLag",
+ newGauge("ConsumerLag",
new Gauge[Long] {
def value = lagVal.get
- }
+ },
+ Map("clientId" -> metricId.clientId,
+ "topic" -> metricId.topic,
+ "partition" -> metricId.partitionId.toString)
)
def lag_=(newLag: Long) {
@@ -221,20 +220,25 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
}
class FetcherLagStats(metricId: ClientIdAndBroker) {
- private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
- val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+ private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
+ val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
- stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))
+ stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
}
class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
- val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
- val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS)
+ val tags = Map("clientId" -> metricId.clientId,
+ "brokerHost" -> metricId.brokerHost,
+ "brokerPort" -> metricId.brokerPort.toString)
+
+ val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+
+ val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
}
-case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) {
- override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId)
+case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
+ override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index e0f14e2..7861240 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -37,7 +37,7 @@ import scala.collection.Seq
* - should return whatever data is available.
*/
-class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey],
+class DelayedFetch(override val keys: Seq[TopicAndPartition],
override val request: RequestChannel.Request,
override val delayMs: Long,
val fetch: FetchRequest,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 9481508..a647761 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -35,7 +35,7 @@ import scala.collection.Seq
* B.2 - else, at least requiredAcks replicas should be caught up to this request.
*/
-class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey],
+class DelayedProduce(override val keys: Seq[TopicAndPartition],
override val request: RequestChannel.Request,
override val delayMs: Long,
val produce: ProducerRequest,
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/DelayedRequestKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala
deleted file mode 100644
index 628ef59..0000000
--- a/core/src/main/scala/kafka/server/DelayedRequestKey.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.common.TopicAndPartition
-
-/**
- * Keys used for delayed request metrics recording
- */
-trait DelayedRequestKey {
- def keyLabel: String
-}
-
-object DelayedRequestKey {
- val globalLabel = "All"
-}
-
-case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
-
- def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
-
- override def keyLabel = "%s-%d".format(topic, partition)
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 85498b4..2f00992 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -201,8 +201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val producerRequestKeys = produceRequest.data.keys.map(
- topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq
+ val producerRequestKeys = produceRequest.data.keys.toSeq
val statuses = localProduceResults.map(r =>
r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
val delayedRequest = new DelayedProduce(
@@ -330,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel,
debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
fetchRequest.clientId))
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_))
+ val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq
val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
dataRead.mapValues(_.offset))
@@ -350,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// for producer requests with ack > 1, we need to check
// if they can be unblocked after some follower's log end offsets have moved
- replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition))
+ replicaManager.unblockDelayedProduceRequests(topicAndPartition)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 00bcc06..e4053fb 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -93,23 +93,28 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}
-class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
- val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS)
- val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS)
- val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS)
- val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec", "bytes", TimeUnit.SECONDS)
- val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS)
- val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
+class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
+ val tags: scala.collection.Map[String, String] = name match {
+ case None => scala.collection.Map.empty
+ case Some(topic) => Map("topic" -> topic)
+ }
+
+ val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags)
+ val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags)
+ val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags)
+ val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
+ val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+ val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
}
object BrokerTopicStats extends Logging {
- private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
+ private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
- private val allTopicsStats = new BrokerTopicMetrics("AllTopics")
+ private val allTopicsStats = new BrokerTopicMetrics(None)
def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
- stats.getAndMaybePut(topic + "-")
+ stats.getAndMaybePut(topic)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4de8123..1bf7d10 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,7 +25,6 @@ import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
-import java.net.BindException
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index cd64bbe..1c1b75b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -17,6 +17,7 @@
package kafka.server
+import kafka.common.AppInfo
import kafka.utils.Logging
@@ -26,6 +27,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
def startup() {
try {
server.startup()
+ AppInfo.registerInfo()
}
catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
index d4a7d4a..e7ff411 100644
--- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -17,6 +17,7 @@
package kafka.server
+import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool
import kafka.network.{BoundedByteBufferSend, RequestChannel}
@@ -30,19 +31,24 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
- private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
- val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+ private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup {
+ val tags: scala.collection.Map[String, String] = metricId match {
+ case Some(topicAndPartition) => Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+ case None => Map.empty
+ }
+
+ val expiredRequestMeter = newMeter("ExpiresPerSecond", "requests", TimeUnit.SECONDS, tags)
}
private val producerRequestMetricsForKey = {
- val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
- new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
+ val valueFactory = (k: TopicAndPartition) => new DelayedProducerRequestMetrics(Some(k))
+ new Pool[TopicAndPartition, DelayedProducerRequestMetrics](Some(valueFactory))
}
- private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+ private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics(None)
- private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
- val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+ private def recordDelayedProducerKeyExpired(metricId: TopicAndPartition) {
+ val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(metricId)
List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
}
@@ -57,7 +63,7 @@ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: Of
def expire(delayedProduce: DelayedProduce) {
debug("Expiring produce request %s.".format(delayedProduce.produce))
for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
- recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
+ recordDelayedProducerKeyExpired(topicPartition)
respond(delayedProduce)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0637fab..1afb0cb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -113,10 +113,10 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Unblock some delayed produce requests with the request key
*/
- def unblockDelayedProduceRequests(key: DelayedRequestKey) {
+ def unblockDelayedProduceRequests(key: TopicAndPartition) {
val satisfied = producerRequestPurgatory.update(key)
debug("Request key %s unblocked %d producer requests."
- .format(key.keyLabel, satisfied.size))
+ .format(key, satisfied.size))
// send any newly unblocked responses
satisfied.foreach(producerRequestPurgatory.respond(_))
@@ -125,9 +125,9 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Unblock some delayed fetch requests with the request key
*/
- def unblockDelayedFetchRequests(key: DelayedRequestKey) {
+ def unblockDelayedFetchRequests(key: TopicAndPartition) {
val satisfied = fetchRequestPurgatory.update(key)
- debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
+ debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))
// send any newly unblocked responses
satisfied.foreach(fetchRequestPurgatory.respond(_))
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index f61c7c7..f2dc4ed 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong
import java.util._
import java.text.SimpleDateFormat
import java.math.BigInteger
-import scala.collection.immutable.List
import org.apache.log4j.Logger
http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..bad099a 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -26,8 +26,7 @@ import kafka.message._
import kafka.serializer._
import org.I0Itec.zkclient.ZkClient
import kafka.utils._
-import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
-import java.util.{Collections, Properties}
+import java.util.Collections
import org.apache.log4j.{Logger, Level}
import kafka.utils.TestUtils._
import kafka.common.MessageStreamsExistException
@@ -89,8 +88,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown
// send some messages to each broker
- val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+ val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -123,8 +122,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+ val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -144,8 +143,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
- val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+ val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -178,8 +177,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
- val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+ val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -211,8 +210,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+ val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -232,8 +231,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
- val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+ val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -254,8 +253,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompressionSetConsumption() {
// send some messages to each broker
- val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
+ val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++
+ sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -280,8 +279,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker
- val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
- sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
+ val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++
+ sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -321,7 +320,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// send some messages to each broker
- val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
+ val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -345,70 +344,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkClient.close()
}
- def sendMessagesToBrokerPartition(config: KafkaConfig,
- topic: String,
- partition: Int,
- numMessages: Int,
- compression: CompressionCodec = NoCompressionCodec): List[String] = {
- val header = "test-%d-%d".format(config.brokerId, partition)
- val props = new Properties()
- props.put("compression.codec", compression.codec.toString)
- val producer: Producer[Int, String] =
- createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName,
- partitioner = classOf[FixedValuePartitioner].getName,
- producerProps = props)
-
- val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x)
- producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
- debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
- producer.close()
- ms.toList
- }
-
- def sendMessages(config: KafkaConfig,
- messagesPerNode: Int,
- header: String,
- compression: CompressionCodec,
- numParts: Int): List[String]= {
- var messages: List[String] = Nil
- val props = new Properties()
- props.put("compression.codec", compression.codec.toString)
- val producer: Producer[Int, String] =
- createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
- encoder = classOf[StringEncoder].getName,
- keyEncoder = classOf[IntEncoder].getName,
- partitioner = classOf[FixedValuePartitioner].getName,
- producerProps = props)
-
- for (partition <- 0 until numParts) {
- val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
- producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
- messages ++= ms
- debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
- }
- producer.close()
- messages
- }
-
- def getMessages(nMessagesPerThread: Int,
- topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= {
- var messages: List[String] = Nil
- for((topic, messageStreams) <- topicMessageStreams) {
- for (messageStream <- messageStreams) {
- val iterator = messageStream.iterator
- for(i <- 0 until nMessagesPerThread) {
- assertTrue(iterator.hasNext)
- val message = iterator.next.message
- messages ::= message
- debug("received message: " + message)
- }
- }
- }
- messages.reverse
- }
-
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
val children = zkClient.getChildren(path)
Collections.sort(children)