You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/08/16 23:26:40 UTC
svn commit: r1374069 - in /incubator/kafka/branches/0.8:
core/src/main/scala/kafka/ core/src/main/scala/kafka/api/
core/src/main/scala/kafka/metrics/ core/src/main/scala/kafka/server/
core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/inte...
Author: jjkoshy
Date: Thu Aug 16 21:26:40 2012
New Revision: 1374069
URL: http://svn.apache.org/viewvc?rev=1374069&view=rev
Log:
KAFKA-385 Fix race condition between checkSatisfied and expire in RequestPurgatory; fixed constant expiration of follower fetch requests as checkSatisfied was not getting called; add metrics to the RequestPurgatory; add a KafkaTimer convenience class; patched by Joel Koshy; reviewed by Jun Rao and Jay Kreps.
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaTimer.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
incubator/kafka/branches/0.8/project/build/KafkaProject.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Thu Aug 16 21:26:40 2012
@@ -17,6 +17,8 @@
package kafka
+
+import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.{Utils, Logging}
import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -36,6 +38,13 @@ object Kafka extends Logging {
try {
val props = Utils.loadProps(args(0))
val serverConfig = new KafkaConfig(props)
+ val metricsConfig = new KafkaMetricsConfig(props)
+ metricsConfig.reporters.foreach(reporterType => {
+ val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
+ reporter.init(props)
+ if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
+ Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+ })
val kafkaServerStartble = new KafkaServerStartable(serverConfig)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Thu Aug 16 21:26:40 2012
@@ -73,7 +73,6 @@ class PartitionDataSend(val partitionDat
}
}
-
object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, "UTF-8")
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,85 @@
+/**
+ *
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+import java.util.Properties
+import com.yammer.metrics.Metrics
+import java.io.File
+import com.yammer.metrics.reporting.CsvReporter
+import kafka.utils.{Logging, Utils}
+import java.util.concurrent.TimeUnit
+
+
+private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
+
+private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
+ with KafkaCSVMetricsReporterMBean
+ with Logging {
+
+ private var csvDir: File = null
+ private var underlying: CsvReporter = null
+ private var running = false
+ private var initialized = false
+
+
+ override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter"
+
+
+ override def init(props: Properties) {
+ synchronized {
+ if (!initialized) {
+ val metricsConfig = new KafkaMetricsConfig(props)
+ csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics"))
+ if (!csvDir.exists())
+ csvDir.mkdirs()
+ underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
+ if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false))
+ startReporter(metricsConfig.pollingIntervalSecs)
+ initialized = true
+ }
+ }
+ }
+
+
+ override def startReporter(pollingPeriodSecs: Long) {
+ synchronized {
+ if (initialized && !running) {
+ underlying.start(pollingPeriodSecs, TimeUnit.SECONDS)
+ running = true
+ info("Started Kafka CSV metrics reporter with polling period %d seconds".format(pollingPeriodSecs))
+ }
+ }
+ }
+
+
+ override def stopReporter() {
+ synchronized {
+ if (initialized && running) {
+ underlying.shutdown()
+ running = false
+ info("Stopped Kafka CSV metrics reporter")
+ underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
+ }
+ }
+ }
+
+}
+
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,38 @@
+/**
+ *
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+import java.util.Properties
+import kafka.utils.Utils
+
+class KafkaMetricsConfig(props: Properties) {
+
+ /**
+ * Comma-separated list of reporter types. These classes should be on the
+ * classpath and will be instantiated at run-time.
+ */
+ val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", ""))
+
+ /**
+ * The metrics polling interval (in seconds).
+ */
+ val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10)
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.utils.Logging
+import java.util.concurrent.TimeUnit
+import com.yammer.metrics.Metrics
+
+
+trait KafkaMetricsGroup extends Logging {
+
+ /**
+ * This method enables the user to form logical sub-groups of this
+ * KafkaMetricsGroup by inserting a sub-group identifier in the package
+ * string.
+ *
+ * @return The sub-group identifier.
+ */
+ def metricsGroupIdent: String
+
+ /**
+ * Creates a new MetricName object for gauges, meters, etc. created for this
+ * metrics group. It uses the metricsGroupIdent to create logical sub-groups.
+ * This is currently specifically of use to classes under kafka, with
+ * broker-id being the most common metrics grouping strategy.
+ *
+ * @param name Descriptive name of the metric.
+ * @return Sanitized metric name object.
+ */
+ private def metricName(name: String) = {
+ val ident = metricsGroupIdent
+ val klass = this.getClass
+ val pkg = {
+ val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName
+ if (ident.nonEmpty) {
+ // insert the sub-group identifier after the top-level package
+ if (actualPkg.contains("."))
+ actualPkg.replaceFirst("""\.""", ".%s.".format(ident))
+ else
+ actualPkg + "." + ident
+ }
+ else
+ actualPkg
+ }
+ val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
+ new MetricName(pkg, simpleName, name)
+ }
+
+ def newGauge[T](name: String, metric: Gauge[T]) =
+ Metrics.newGauge(metricName(name), metric)
+
+ def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
+ Metrics.newMeter(metricName(name), eventType, timeUnit)
+
+ def newHistogram(name: String, biased: Boolean = false) = Metrics.newHistogram(metricName(name), biased)
+
+ def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) =
+ Metrics.newTimer(metricName(name), durationUnit, rateUnit)
+
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,47 @@
+/**
+ *
+ *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+import java.util.Properties
+
+/**
+ * Base trait for reporter MBeans. If a client wants to expose these JMX
+ * operations on a custom reporter (that implements
+ * [[kafka.metrics.KafkaMetricsReporter]]), the custom reporter needs to
+ * additionally implement an MBean trait that extends this trait so that the
+ * registered MBean is compliant with the standard MBean convention.
+ */
+trait KafkaMetricsReporterMBean {
+ def startReporter(pollingPeriodInSeconds: Long)
+ def stopReporter()
+
+ /**
+ *
+ * @return The name with which the MBean will be registered.
+ */
+ def getMBeanName: String
+}
+
+
+trait KafkaMetricsReporter {
+ def init(props: Properties)
+}
+
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaTimer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaTimer.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaTimer.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaTimer.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+import com.yammer.metrics.core.Timer
+
+/**
+ * A wrapper around metrics timer object that provides a convenient mechanism
+ * to time code blocks. This pattern was borrowed from the metrics-scala_2.9.1
+ * package.
+ * @param metric The underlying timer object.
+ */
+class KafkaTimer(metric: Timer) {
+
+ def time[A](f: => A): A = {
+ val ctx = metric.time
+ try {
+ f
+ }
+ finally {
+ ctx.stop()
+ }
+ }
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Aug 16 21:26:40 2012
@@ -18,21 +18,23 @@
package kafka.server
import java.io.IOException
-import java.util.concurrent.atomic._
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
import kafka.common._
import kafka.log._
import kafka.message._
import kafka.network._
+import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
import kafka.network.RequestChannel.Response
-import kafka.utils.{ZkUtils, SystemTime, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
import kafka.cluster.Replica
+
/**
* Logic to handle the various Kafka requests
*/
@@ -44,10 +46,13 @@ class KafkaApis(val requestChannel: Requ
becomeFollower: (Replica, LeaderAndISR) => Short,
brokerId: Int) extends Logging {
- private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId)
- private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+ private val metricsGroup = brokerId.toString
+ private val producerRequestPurgatory = new ProducerRequestPurgatory
+ private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+ private val delayedRequestMetrics = new DelayedRequestMetrics
+
private val requestLogger = Logger.getLogger("kafka.request.logger")
- this.logIdent = "KafkaApi on Broker " + brokerId + ", "
+ this.logIdent = "KafkaApis-%d ".format(brokerId)
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -69,7 +74,9 @@ class KafkaApis(val requestChannel: Requ
def handleLeaderAndISRRequest(request: RequestChannel.Request){
val responseMap = new HashMap[(String, Int), Short]
val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
- info("handling leader and isr request " + leaderAndISRRequest)
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
+ trace("Handling leader and isr request " + leaderAndISRRequest)
for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
var errorCode = ErrorMapping.NoError
@@ -78,12 +85,12 @@ class KafkaApis(val requestChannel: Requ
// If the partition does not exist locally, create it
if(replicaManager.getPartition(topic, partition) == None){
- trace("the partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
+ trace("The partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition)
- trace("assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
+ trace("Assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
if(assignedReplicas.contains(brokerId)) {
val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
- info("starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
+ info("Starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
}
}
val replica = replicaManager.getReplica(topic, partition).get
@@ -91,11 +98,11 @@ class KafkaApis(val requestChannel: Requ
val requestedLeaderId = leaderAndISR.leader
// If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id)
if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){
- info("becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
+ info("Becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
errorCode = becomeLeader(replica, leaderAndISR)
}
else if (requestedLeaderId != brokerId) {
- info("becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
+ info("Becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
errorCode = becomeFollower(replica, leaderAndISR)
}
@@ -105,7 +112,7 @@ class KafkaApis(val requestChannel: Requ
if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
replicaManager.startHighWaterMarksCheckPointThread
val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet
- info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+ info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2))
}
@@ -116,6 +123,10 @@ class KafkaApis(val requestChannel: Requ
def handleStopReplicaRequest(request: RequestChannel.Request){
val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
+ trace("Handling stop replica request " + stopReplicaRequest)
+
val responseMap = new HashMap[(String, Int), Short]
for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
@@ -133,12 +144,17 @@ class KafkaApis(val requestChannel: Requ
def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
var satisfied = new mutable.ArrayBuffer[DelayedFetch]
for(partitionData <- partitionDatas)
- satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
- trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+ satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null)
+ trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size))
// send any newly unblocked responses
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+
+ val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
+ delayedRequestMetrics.recordDelayedFetchSatisfied(
+ fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response)
+
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
}
}
@@ -150,43 +166,45 @@ class KafkaApis(val requestChannel: Requ
val produceRequest = ProducerRequest.readFrom(request.request.buffer)
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
- requestLogger.trace("producer request %s".format(produceRequest.toString))
- trace("Broker %s received produce request %s".format(brokerId, produceRequest.toString))
+ requestLogger.trace("Handling producer request " + request.toString)
+ trace("Handling producer request " + request.toString)
val response = produceToLocalLog(produceRequest)
- debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
-
+ debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
+ for (topicData <- produceRequest.data)
+ maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
+
if (produceRequest.requiredAcks == 0 ||
produceRequest.requiredAcks == 1 ||
- produceRequest.data.size <= 0) {
+ produceRequest.data.size <= 0)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
-
- for (topicData <- produceRequest.data)
- maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
- }
else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val topicPartitionPairs = produceRequest.data.flatMap(topicData => {
+ val producerRequestKeys = produceRequest.data.flatMap(topicData => {
val topic = topicData.topic
topicData.partitionDataArray.map(partitionData => {
- (topic, partitionData.partition)
+ RequestKey(topic, partitionData.partition)
})
})
+
val delayedProduce = new DelayedProduce(
- topicPartitionPairs, request,
+ producerRequestKeys, request,
response.errors, response.offsets,
produceRequest, produceRequest.ackTimeoutMs.toLong)
- produceRequestPurgatory.watch(delayedProduce)
+ producerRequestPurgatory.watch(delayedProduce)
+
/*
* Replica fetch requests may have arrived (and potentially satisfied)
- * delayedProduce requests before they even made it to the purgatory.
+ * delayedProduce requests while they were being added to the purgatory.
* Here, we explicitly check if any of them can be satisfied.
*/
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- topicPartitionPairs.foreach(topicPartition =>
- satisfiedProduceRequests ++=
- produceRequestPurgatory.update(topicPartition, topicPartition))
- debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size))
+ producerRequestKeys.foreach(key =>
+ satisfiedProduceRequests ++=
+ producerRequestPurgatory.update(key, key))
+ debug(satisfiedProduceRequests.size +
+ " producer requests unblocked during produce to local log.")
satisfiedProduceRequests.foreach(_.respond())
}
}
@@ -195,10 +213,11 @@ class KafkaApis(val requestChannel: Requ
* Helper method for handling a parsed producer request
*/
private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
- trace("produce [%s] to local log ".format(request.toString))
+ trace("Produce [%s] to local log ".format(request.toString))
val requestSize = request.topicPartitionCount
val errors = new Array[Short](requestSize)
val offsets = new Array[Long](requestSize)
+
var msgIndex = -1
for(topicData <- request.data) {
for(partitionData <- topicData.partitionDataArray) {
@@ -212,12 +231,13 @@ class KafkaApis(val requestChannel: Requ
replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
- trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
+ trace("%d bytes written to logs, nextAppendOffset = %d"
+ .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
} catch {
case e =>
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
- error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
+ error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
@@ -229,8 +249,7 @@ class KafkaApis(val requestChannel: Requ
}
}
}
- val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
- ret
+ new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
}
/**
@@ -238,7 +257,10 @@ class KafkaApis(val requestChannel: Requ
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = FetchRequest.readFrom(request.request.buffer)
- trace("handling fetch request: " + fetchRequest.toString)
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling fetch request " + fetchRequest.toString)
+ trace("Handling fetch request " + fetchRequest.toString)
+
// validate the request
try {
fetchRequest.validate()
@@ -255,12 +277,12 @@ class KafkaApis(val requestChannel: Requ
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
topicOffsetInfo.partitions.foreach(partition => {
- satisfiedProduceRequests ++= produceRequestPurgatory.update(
- (topicOffsetInfo.topic, partition), (topicOffsetInfo.topic, partition)
- )
+ val key = RequestKey(topicOffsetInfo.topic, partition)
+ satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
})
})
- debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+ debug("Replica %d fetch unblocked %d producer requests."
+ .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
satisfiedProduceRequests.foreach(_.respond())
}
@@ -270,14 +292,15 @@ class KafkaApis(val requestChannel: Requ
availableBytes >= fetchRequest.minBytes ||
fetchRequest.numPartitions <= 0) {
val topicData = readMessageSets(fetchRequest)
- debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+ debug("Returning fetch response %s for fetch request with correlation id %d".format(
+ topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
- debug("putting fetch request into purgatory")
+ debug("Putting fetch request into purgatory")
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
- val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
+ val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _)))
+ val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
fetchRequestPurgatory.watch(delayedFetch)
}
}
@@ -298,16 +321,18 @@ class KafkaApis(val requestChannel: Requ
totalBytes += math.min(offsetDetail.fetchSizes(i), available)
} catch {
case e: InvalidPartitionException =>
- info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+ info("Invalid partition %d in fetch request from client %d."
+ .format(offsetDetail.partitions(i), fetchRequest.clientId))
}
}
}
+ trace(totalBytes + " available bytes for fetch request.")
totalBytes
}
private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
val offsets = fetchRequest.offsetInfo
- debug("act on update partition HW, check offset detail: %s ".format(offsets))
+ debug("Act on update partition HW, check offset detail: %s ".format(offsets))
for(offsetDetail <- offsets) {
val topic = offsetDetail.topic
val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
@@ -343,17 +368,20 @@ class KafkaApis(val requestChannel: Requ
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId)
- assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(brokerId))
+ assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId))
val leaderReplica = leaderReplicaOpt.get
fetchRequest.replicaId match {
- case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
+ case FetchRequest.NonFollowerId =>
+ // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
case _ => // fetch request from a follower
val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId))
val replica = replicaOpt.get
- debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
- debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+ debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
+ .format(replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+ debug("Leader returning %d messages for topic %s partition %d to follower %d"
+ .format(messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
}
}
@@ -372,7 +400,7 @@ class KafkaApis(val requestChannel: Requ
try {
// check if the current broker is the leader for the partitions
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
- trace("fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+ trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val log = logManager.getLog(topic, partition)
response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
} catch {
@@ -389,7 +417,9 @@ class KafkaApis(val requestChannel: Requ
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
- requestLogger.trace("offset request " + offsetRequest.toString)
+ requestLogger.trace("Handling offset request " + offsetRequest.toString)
+ trace("Handling offset request " + offsetRequest.toString)
+
var response: OffsetResponse = null
try {
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
@@ -412,11 +442,14 @@ class KafkaApis(val requestChannel: Requ
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
- requestLogger.trace("topic metadata request " + metadataRequest.toString())
+ requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
+ trace("Handling topic metadata request " + metadataRequest.toString())
+
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val zkClient = kafkaZookeeper.getZookeeperClient
var errorCode = ErrorMapping.NoError
val config = logManager.config
+
try {
val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
metadataRequest.topics.zip(topicMetadataList).foreach(
@@ -452,33 +485,43 @@ class KafkaApis(val requestChannel: Requ
}
def close() {
- debug("shut down")
+ debug("Shutting down.")
fetchRequestPurgatory.shutdown()
- produceRequestPurgatory.shutdown()
- debug("shutted down completely")
+ producerRequestPurgatory.shutdown()
+ debug("Shut down complete.")
+ }
+
+ private [kafka] trait MetricKey {
+ def keyLabel: String
+ }
+ private [kafka] object MetricKey {
+ val globalLabel = "all"
}
+ private [kafka] case class RequestKey(topic: String, partition: Int)
+ extends MetricKey {
+ override def keyLabel = "%s-%d".format(topic, partition)
+ }
/**
* A delayed fetch request
*/
- class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
- val bytesAccumulated = new AtomicLong(initialSize)
- }
+ class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long)
+ extends DelayedRequest(keys, request, delayMs)
/**
* A holding pen for fetch requests waiting to be satisfied
*/
- class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") {
+ class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
+
+ this.logIdent = "FetchRequestPurgatory-%d ".format(brokerId)
+
+ override def metricsGroupIdent = metricsGroup
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
- def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
- val messageDataSize = partitionData.messages.sizeInBytes
- val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
- debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes)
- accumulatedSize >= delayedFetch.fetch.minBytes
- }
+ def checkSatisfied(n: Null, delayedFetch: DelayedFetch): Boolean =
+ availableFetchBytes(delayedFetch.fetch) >= delayedFetch.fetch.minBytes
/**
* When a request expires just answer it with whatever data is present
@@ -486,11 +529,13 @@ class KafkaApis(val requestChannel: Requ
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+ val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId
+ delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
}
}
- class DelayedProduce(keys: Seq[Any],
+ class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request,
localErrors: Array[Short],
requiredOffsets: Array[Long],
@@ -504,7 +549,7 @@ class KafkaApis(val requestChannel: Requ
* values are effectively synchronized by the ProducerRequestPurgatory's
* update method
*/
- private val partitionStatus = keys.map(key => {
+ private [kafka] val partitionStatus = keys.map(key => {
val keyIndex = keys.indexOf(key)
// if there was an error in writing to the local replica's log, then don't
// wait for acks on this partition
@@ -525,13 +570,13 @@ class KafkaApis(val requestChannel: Requ
def respond() {
val errorsAndOffsets: (List[Short], List[Long]) = (
- keys.foldRight
- ((List[Short](), List[Long]()))
- ((key: Any, result: (List[Short], List[Long])) => {
- val status = partitionStatus(key)
- (status.error :: result._1, status.requiredOffset :: result._2)
- })
- )
+ keys.foldRight
+ ((List[Short](), List[Long]()))
+ ((key: RequestKey, result: (List[Short], List[Long])) => {
+ val status = partitionStatus(key)
+ (status.error :: result._1, status.requiredOffset :: result._2)
+ })
+ )
val response = new ProducerResponse(produce.versionId, produce.correlationId,
errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
@@ -550,9 +595,14 @@ class KafkaApis(val requestChannel: Requ
* As partitions become acknowledged, we may be able to unblock
* DelayedFetchRequests that are pending on those partitions.
*/
- def isSatisfied(followerFetchPartition: (String, Int)) = {
- val (topic, partitionId) = followerFetchPartition
- val fetchPartitionStatus = partitionStatus(followerFetchPartition)
+ def isSatisfied(followerFetchRequestKey: RequestKey) = {
+ val topic = followerFetchRequestKey.topic
+ val partitionId = followerFetchRequestKey.partition
+ val key = RequestKey(topic, partitionId)
+ val fetchPartitionStatus = partitionStatus(key)
+ val durationNs = SystemTime.nanoseconds - creationTimeNs
+ trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
+ .format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId)
leaderReplica match {
@@ -560,14 +610,16 @@ class KafkaApis(val requestChannel: Requ
if (leader.isLocal) {
val isr = leader.partition.inSyncReplicas
val numAcks = isr.count(r => {
- if (!r.isLocal)
- r.logEndOffset() >= partitionStatus(followerFetchPartition).requiredOffset
+ if (!r.isLocal) {
+ r.logEndOffset() >= partitionStatus(key).requiredOffset
+ }
else
true /* also count the local (leader) replica */
})
- trace("Received %d/%d acks for produce request to %s-%d".format(
+
+ trace("Received %d/%d acks for producer request to %s-%d; isr size = %d".format(
numAcks, produce.requiredAcks,
- topic, partitionId))
+ topic, partitionId, isr.size))
if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
(produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
/*
@@ -575,12 +627,16 @@ class KafkaApis(val requestChannel: Requ
* are fully caught up to the (local) leader's offset
* corresponding to this produce request.
*/
+
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.error = ErrorMapping.NoError
val topicData =
produce.data.find(_.topic == topic).get
val partitionData =
topicData.partitionDataArray.find(_.partition == partitionId).get
+ delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
+ durationNs,
+ partitionData.sizeInBytes)
maybeUnblockDelayedFetchRequests(
topic, Array(partitionData))
}
@@ -597,7 +653,10 @@ class KafkaApis(val requestChannel: Requ
}
// unblocked if there are no partitions with pending acks
- ! partitionStatus.exists(p => p._2.acksPending)
+ val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+ if (satisfied)
+ delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs)
+ satisfied
}
class PartitionStatus(var acksPending: Boolean,
@@ -618,18 +677,159 @@ class KafkaApis(val requestChannel: Requ
/**
* A holding pen for produce requests waiting to be satisfied.
*/
- private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") {
+ private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
+
- protected def checkSatisfied(fetchRequestPartition: (String, Int),
+ this.logIdent = "ProducerRequestPurgatory-%d ".format(brokerId)
+
+ override def metricsGroupIdent = metricsGroup
+
+ protected def checkSatisfied(followerFetchRequestKey: RequestKey,
delayedProduce: DelayedProduce) =
- delayedProduce.isSatisfied(fetchRequestPartition)
+ delayedProduce.isSatisfied(followerFetchRequestKey)
/**
* Handle an expired delayed request
*/
protected def expire(delayedProduce: DelayedProduce) {
+ for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending)
+ delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1)
+
delayedProduce.respond()
}
}
+
+ private class DelayedRequestMetrics {
+
+
+ private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
+ override def metricsGroupIdent = metricsGroup
+ val caughtUpFollowerFetchRequestMeter =
+ newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
+ val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
+ Some(newHistogram("FollowerCatchUpTimeInNs", biased = true))
+ else None
+
+ /*
+ * Note that throughput is updated on individual key satisfaction.
+ * Therefore, it is an upper bound on throughput since the
+ * DelayedProducerRequest may get expired.
+ */
+ val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS)
+ val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
+
+ val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
+ Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS))
+ else None
+ val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
+ Some(newHistogram("SatisfactionTimeInNs", biased = true))
+ else None
+ }
+
+
+ private class DelayedFetchRequestMetrics(forFollower: Boolean,
+ keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
+ private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
+
+ override def metricsGroupIdent = metricsGroup
+ val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
+ Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
+ "requests", TimeUnit.SECONDS))
+ else None
+
+ val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
+ Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true))
+ else None
+
+ val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel)
+ Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond",
+ "requests", TimeUnit.SECONDS))
+ else None
+
+ val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel),
+ "bytes", TimeUnit.SECONDS)
+ }
+
+ private val producerRequestMetricsForKey = {
+ val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel)
+ new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
+ }
+
+ private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+
+ private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
+ private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
+
+ private val followerFetchRequestMetricsForKey = {
+ val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel)
+ new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
+ }
+
+ private val nonFollowerFetchRequestMetricsForKey = {
+ val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel)
+ new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
+ }
+
+ def recordDelayedProducerKeyExpired(key: MetricKey) {
+ val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+ List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
+ }
+
+
+ def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
+ val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+ List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
+ m.caughtUpFollowerFetchRequestMeter.mark()
+ m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs))
+ m.throughputMeter.mark(bytes)
+ })
+ }
+
+
+ def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) {
+ aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark())
+ aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
+ }
+
+
+ private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
+ val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+ else aggregateNonFollowerFetchRequestMetrics
+ metrics.throughputMeter.mark(response.sizeInBytes)
+
+ response.topicMap.foreach(topicAndData => {
+ val topic = topicAndData._1
+ topicAndData._2.partitionDataArray.foreach(partitionData => {
+ val key = RequestKey(topic, partitionData.partition)
+ val keyMetrics = if (forFollower)
+ followerFetchRequestMetricsForKey.getAndMaybePut(key)
+ else
+ nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
+ keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
+ })
+ })
+ }
+
+
+ def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) {
+ val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+ else aggregateNonFollowerFetchRequestMetrics
+
+ metrics.expiredRequestMeter.foreach(_.mark())
+
+ recordDelayedFetchThroughput(forFollower, response)
+ }
+
+
+ def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) {
+ val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+ else aggregateNonFollowerFetchRequestMetrics
+
+ aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
+ aggregateMetrics.satisfiedRequestMeter.foreach(_.mark())
+
+ recordDelayedFetchThroughput(forFollower, response)
+ }
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Aug 16 21:26:40 2012
@@ -24,6 +24,7 @@ import kafka.consumer.ConsumerConfig
import java.net.InetAddress
+
/**
* Configuration settings for the kafka server
*/
@@ -138,7 +139,7 @@ class KafkaConfig(props: Properties) ext
val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
/** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
- val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
+ val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096)
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Thu Aug 16 21:26:40 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -23,6 +23,9 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.network._
import kafka.utils._
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+
/**
* A request whose processing needs to be delayed for at most the given delayMs
@@ -30,6 +33,7 @@ import kafka.utils._
* for example a key could be a (topic, partition) pair.
*/
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
+ val creationTimeNs = SystemTime.nanoseconds
val satisfied = new AtomicBoolean(false)
}
@@ -58,13 +62,41 @@ class DelayedRequest(val keys: Seq[Any],
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends Logging{
- this.logIdent = logPrefix
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
+
/* a list of requests watching each key */
- private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
+ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
+
+ private val numDelayedRequestsBeanName = "NumDelayedRequests"
+ private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs"
+ private val satisfactionRateBeanName = "SatisfactionRate"
+ private val expirationRateBeanName = "ExpirationRate"
+
+ override def metricsGroupIdent = ""
+
+ val satisfactionRateMeter = newMeter(
+ satisfactionRateBeanName,
+ "requests",
+ TimeUnit.SECONDS
+ )
+
+ val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true)
+
+ newGauge(
+ numDelayedRequestsBeanName,
+ new Gauge[Int] {
+ def value() = expiredRequestReaper.unsatisfied.get()
+ }
+ )
+
+ val expirationRateMeter = newMeter(
+ expirationRateBeanName,
+ "requests",
+ TimeUnit.SECONDS
+ )
/* background thread expiring requests that have been waiting too long */
- private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
+ private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
@@ -90,15 +122,8 @@ abstract class RequestPurgatory[T <: Del
w.collectSatisfiedRequests(request)
}
- private def watchersFor(key: Any): Watchers = {
- var lst = watchersForKey.get(key)
- if(lst == null) {
- watchersForKey.putIfAbsent(key, new Watchers)
- lst = watchersForKey.get(key)
- }
- lst
- }
-
+ private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
+
/**
* Check if this request satisfied this delayed request
*/
@@ -117,7 +142,8 @@ abstract class RequestPurgatory[T <: Del
}
/**
- * A linked list of DelayedRequests watching some key with some associated bookeeping logic
+ * A linked list of DelayedRequests watching some key with some associated
+ * bookkeeping logic.
*/
private class Watchers {
@@ -132,10 +158,10 @@ abstract class RequestPurgatory[T <: Del
def add(t: T) {
synchronized {
- requests.add(t)
- liveCount += 1
- maybePurge()
- }
+ requests.add(t)
+ liveCount += 1
+ maybePurge()
+ }
}
private def maybePurge() {
@@ -151,32 +177,39 @@ abstract class RequestPurgatory[T <: Del
def decLiveCount() {
synchronized {
- liveCount -= 1
- }
+ liveCount -= 1
+ }
}
def collectSatisfiedRequests(request: R): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
- val iter = requests.iterator()
- while(iter.hasNext) {
- val curr = iter.next
- if(curr.satisfied.get) {
- // another thread has satisfied this request, remove it
- iter.remove()
- } else {
- if(checkSatisfied(request, curr)) {
- iter.remove()
- val updated = curr.satisfied.compareAndSet(false, true)
- if(updated == true) {
- response += curr
- liveCount -= 1
- expiredRequestReaper.satisfyRequest()
- }
- }
- }
- }
- }
+ val iter = requests.iterator()
+ while(iter.hasNext) {
+ val curr = iter.next
+ if(curr.satisfied.get) {
+ // another thread has satisfied this request, remove it
+ iter.remove()
+ } else {
+ // synchronize on curr to avoid any race condition with expire
+ // on client-side.
+ val satisfied = curr synchronized checkSatisfied(request, curr)
+ if(satisfied) {
+ iter.remove()
+ val updated = curr.satisfied.compareAndSet(false, true)
+ if(updated == true) {
+ val requestNs = SystemTime.nanoseconds - curr.creationTimeNs
+ satisfactionRateMeter.mark()
+ timeToSatisfyHistogram.update(requestNs)
+
+ response += curr
+ liveCount -= 1
+ expiredRequestReaper.satisfyRequest()
+ }
+ }
+ }
+ }
+ }
response
}
}
@@ -184,9 +217,8 @@ abstract class RequestPurgatory[T <: Del
/**
* Runnable to expire requests that have sat unfullfilled past their deadline
*/
- private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
- this.logIdent = "ExpiredRequestReaper for " + logPrefix
-
+ private class ExpiredRequestReaper extends Runnable with Logging {
+ this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
/* a few magic parameters to help do cleanup to avoid accumulating old watchers */
private val CleanupThresholdSize = 100
private val CleanupThresholdPrct = 0.5
@@ -196,14 +228,16 @@ abstract class RequestPurgatory[T <: Del
private val shutdownLatch = new CountDownLatch(1)
private val needsPurge = new AtomicBoolean(false)
/* The count of elements in the delay queue that are unsatisfied */
- private val unsatisfied = new AtomicInteger(0)
+ private [kafka] val unsatisfied = new AtomicInteger(0)
/** Main loop for the expiry thread */
def run() {
while(running.get) {
try {
val curr = pollExpired()
- expire(curr)
+ curr synchronized {
+ expire(curr)
+ }
} catch {
case ie: InterruptedException =>
if(needsPurge.getAndSet(false)) {
@@ -232,11 +266,11 @@ abstract class RequestPurgatory[T <: Del
/** Shutdown the expiry thread*/
def shutdown() {
- debug("shutting down")
+ debug("Shutting down.")
running.set(false)
expirationThread.interrupt()
shutdownLatch.await()
- debug("shut down completely")
+ debug("Shut down complete.")
}
/** Record the fact that we satisfied a request in the stats for the expiry queue */
@@ -250,6 +284,7 @@ abstract class RequestPurgatory[T <: Del
val curr = delayed.take()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
+ expirationRateMeter.mark()
unsatisfied.getAndDecrement()
for(key <- curr.keys)
watchersFor(key).decLiveCount()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala Thu Aug 16 21:26:40 2012
@@ -20,21 +20,45 @@ package kafka.utils
import java.util.ArrayList
import java.util.concurrent._
import collection.JavaConversions
+import kafka.common.KafkaException
-class Pool[K,V] extends Iterable[(K, V)] {
+
+class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
private val pool = new ConcurrentHashMap[K, V]
def this(m: collection.Map[K, V]) {
this()
- for((k,v) <- m.elements)
- pool.put(k, v)
+ m.foreach(kv => pool.put(kv._1, kv._2))
}
def put(k: K, v: V) = pool.put(k, v)
def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
-
+
+ /**
+ * Gets the value associated with the given key. If there is no associated
+ * value, then create the value using the pool's value factory and return the
+ * value associated with the key. The user should declare the factory method
+ * as lazy if its side-effects need to be avoided.
+ *
+ * @param key The key to lookup.
+ * @return The final value associated with the key. This may be different from
+ * the value created by the factory if another thread successfully
+ * put a value.
+ */
+ def getAndMaybePut(key: K) = {
+ if (valueFactory.isEmpty)
+ throw new KafkaException("Empty value factory in pool.")
+ val curr = pool.get(key)
+ if (curr == null) {
+ pool.putIfAbsent(key, valueFactory.get(key))
+ pool.get(key)
+ }
+ else
+ curr
+ }
+
def contains(id: K) = pool.containsKey(id)
def get(key: K): V = pool.get(key)
@@ -46,7 +70,7 @@ class Pool[K,V] extends Iterable[(K, V)]
def values: Iterable[V] =
JavaConversions.asIterable(new ArrayList[V](pool.values()))
- def clear: Unit = pool.clear()
+ def clear() { pool.clear() }
override def size = pool.size
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Thu Aug 16 21:26:40 2012
@@ -76,7 +76,7 @@ class LogCorruptionTest extends JUnit3Su
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
} catch {
- case e: InvalidMessageSizeException => println("This is good")
+ case e: InvalidMessageSizeException => "This is good"
}
// test ZookeeperConsumer
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Thu Aug 16 21:26:40 2012
@@ -103,8 +103,10 @@ class TopicMetadataTest extends JUnit3Su
val logManager = EasyMock.createMock(classOf[LogManager])
val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+ EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
EasyMock.expect(logManager.config).andReturn(configs.head)
+ EasyMock.replay(replicaManager)
EasyMock.replay(logManager)
EasyMock.replay(kafkaZookeeper)
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala?rev=1374069&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala Thu Aug 16 21:26:40 2012
@@ -0,0 +1,60 @@
+package unit.kafka.metrics
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import java.util.concurrent.TimeUnit
+import junit.framework.Assert._
+import kafka.metrics.KafkaTimer
+import com.yammer.metrics.core.{MetricsRegistry, Clock}
+
+class KafkaTimerTest extends JUnit3Suite {
+
+ @Test
+ def testKafkaTimer() {
+ val clock = new ManualClock
+ val testRegistry = new MetricsRegistry(clock)
+ val metric = testRegistry.newTimer(this.getClass, "TestTimer")
+
+ val timer = new KafkaTimer(metric)
+ timer.time {
+ clock.addMillis(1000)
+ }
+ assertEquals(1, metric.count())
+ assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
+ assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+ }
+
+ private class ManualClock extends Clock {
+
+ private var ticksInNanos = 0L
+
+ override def tick() = {
+ ticksInNanos
+ }
+
+ override def time() = {
+ TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
+ }
+
+ def addMillis(millis: Long) {
+ ticksInNanos += TimeUnit.MILLISECONDS.toNanos(millis)
+ }
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Thu Aug 16 21:26:40 2012
@@ -18,26 +18,28 @@
package kafka.server
import scala.collection._
-import org.junit.{After, Before, Test}
+import org.junit.Test
import junit.framework.Assert._
import kafka.message._
import kafka.api._
import kafka.utils.TestUtils
+import org.scalatest.junit.JUnit3Suite
-class RequestPurgatoryTest {
+
+class RequestPurgatoryTest extends JUnit3Suite {
val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes)))
val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes)))
var purgatory: MockRequestPurgatory = null
- @Before
- def setup() {
+ override def setUp() {
+ super.setUp()
purgatory = new MockRequestPurgatory()
}
- @After
- def teardown() {
+ override def tearDown() {
purgatory.shutdown()
+ super.tearDown()
}
@Test
@@ -54,7 +56,7 @@ class RequestPurgatoryTest {
assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
purgatory.satisfied += r2
assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
- assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+ assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
}
@Test
@@ -73,7 +75,7 @@ class RequestPurgatoryTest {
assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
}
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
+ class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
Modified: incubator/kafka/branches/0.8/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/project/build/KafkaProject.scala?rev=1374069&r1=1374068&r2=1374069&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/project/build/KafkaProject.scala (original)
+++ incubator/kafka/branches/0.8/project/build/KafkaProject.scala Thu Aug 16 21:26:40 2012
@@ -16,7 +16,7 @@
*/
import sbt._
-import scala.xml.{Node, Elem, NodeSeq}
+import scala.xml.{Node, Elem}
import scala.xml.transform.{RewriteRule, RuleTransformer}
class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
@@ -60,11 +60,11 @@ class KafkaProject(info: ProjectInfo) ex
def zkClientDep =
<dependency>
- <groupId>zkclient</groupId>
- <artifactId>zkclient</artifactId>
- <version>20120522</version>
- <scope>compile</scope>
- </dependency>
+ <groupId>zkclient</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>20120522</version>
+ <scope>compile</scope>
+ </dependency>
object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
override def transform(node: Node): Seq[Node] = node match {
@@ -251,6 +251,8 @@ class KafkaProject(info: ProjectInfo) ex
trait CoreDependencies {
val log4j = "log4j" % "log4j" % "1.2.15"
val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+ val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
+ val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
}
trait HadoopDependencies {
@@ -264,5 +266,4 @@ class KafkaProject(info: ProjectInfo) ex
trait CompressionDependencies {
val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"
}
-
}