You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/20 18:21:23 UTC

git commit: kafka-1117; tool for checking the consistency among replicas; patched by Jun Rao; reviewed by Neha Narkhede, Joel Koshy, Swapnil Ghike, Guozhang Wang

Updated Branches:
  refs/heads/trunk 58789d7cb -> 440e45e51


kafka-1117; tool for checking the consistency among replicas; patched by Jun Rao; reviewed by Neha Narkhede, Joel Koshy, Swapnil Ghike, Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/440e45e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/440e45e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/440e45e5

Branch: refs/heads/trunk
Commit: 440e45e5131131a30987b991eabfee147061c42b
Parents: 58789d7
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Nov 20 09:21:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 20 09:21:56 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/api/OffsetResponse.scala   |   6 +-
 .../kafka/tools/ReplicaVerificationTool.scala   | 387 +++++++++++++++++++
 2 files changed, 392 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/440e45e5/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 08dc3cd..fca76a2 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -44,7 +44,11 @@ object OffsetResponse {
 }
 
 
-case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
+case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
+  override def toString(): String = {
+    new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString)
+  }
+}
 
 
 case class OffsetResponse(override val correlationId: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/440e45e5/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
new file mode 100644
index 0000000..f1f139e
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import kafka.cluster.Broker
+import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicReference
+import kafka.client.ClientUtils
+import java.util.regex.{PatternSyntaxException, Pattern}
+import kafka.api._
+import java.text.SimpleDateFormat
+import java.util.Date
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils._
+import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer}
+
+/**
+ *  For verifying the consistency among replicas.
+ *
+ *  1. start a fetcher on every broker.
+ *  2. each fetcher does the following
+ *    2.1 issues fetch request
+ *    2.2 puts the fetched result in a shared buffer
+ *    2.3 waits for all other fetchers to finish step 2.2
+ *    2.4 one of the fetchers verifies the consistency of fetched results among replicas
+ *
+ *  The consistency verification is up to the high watermark. The tool reports the
+ *  max lag between the verified offset and the high watermark among all partitions.
+ *
+ *  If a broker goes down, the verification of the partitions on that broker is delayed
+ *  until the broker is up again.
+ *
+ * Caveats:
+ * 1. The tools needs all brokers to be up at startup time.
+ * 2. The tool doesn't handle out of range offsets.
+ */
+
+object ReplicaVerificationTool extends Logging {
+  val clientId= "replicaVerificationTool"
+  val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
+  val dateFormat = new SimpleDateFormat(dateFormatString)
+
+  def getCurrentTimeString() = {
+    ReplicaVerificationTool.dateFormat.format(new Date(SystemTime.milliseconds))
+  }
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
+                         .withRequiredArg
+                         .describedAs("hostname:port,...,hostname:port")
+                         .ofType(classOf[String])
+    val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
+                         .withRequiredArg
+                         .describedAs("bytes")
+                         .ofType(classOf[java.lang.Integer])
+                         .defaultsTo(ConsumerConfig.FetchSize)
+    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+                         .withRequiredArg
+                         .describedAs("ms")
+                         .ofType(classOf[java.lang.Integer])
+                         .defaultsTo(1000)
+    val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")
+                         .withRequiredArg
+                         .describedAs("Java regex (String)")
+                         .ofType(classOf[String])
+                         .defaultsTo(".*")
+    val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
+                           .withRequiredArg
+                           .describedAs("timestamp/-1(latest)/-2(earliest)")
+                           .ofType(classOf[java.lang.Long])
+                           .defaultsTo(-1L)
+    val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
+                         .withRequiredArg
+                         .describedAs("ms")
+                         .ofType(classOf[java.lang.Long])
+                         .defaultsTo(30 * 1000L)
+
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
+
+    val regex = options.valueOf(topicWhiteListOpt)
+    val topicWhiteListFiler = new Whitelist(regex)
+
+    try {
+      Pattern.compile(regex)
+    }
+    catch {
+      case e: PatternSyntaxException =>
+        throw new RuntimeException(regex + " is an invalid regex.")
+    }
+
+    val fetchSize = options.valueOf(fetchSizeOpt).intValue
+    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
+    val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
+    val reportInterval = options.valueOf(reportIntervalOpt).longValue
+    // getting topic metadata
+    info("Getting topic metatdata...")
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
+    val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
+    val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)
+    val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
+        topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic)) true else false
+    )
+    val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
+      topicMetadataResponse =>
+        topicMetadataResponse.partitionsMetadata.flatMap(
+          partitionMetadata =>
+            partitionMetadata.replicas.map(broker =>
+              TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id))
+        )
+    )
+    debug("Selected topic partitions: " + topicPartitionReplicaList)
+    val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId)
+      .map { case (brokerId, partitions) =>
+               brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } }
+    debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
+    val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
+          topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId))
+          .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
+    debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
+    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(
+      topicMetadataResponse =>
+        topicMetadataResponse.partitionsMetadata.map(
+          partitionMetadata =>
+            (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))
+    ).groupBy(_._2)
+     .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map {
+        case(topicAndPartition, leaderId) => topicAndPartition })
+    debug("Leaders per broker: " + leadersPerBroker)
+
+    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
+                                          leadersPerBroker,
+                                          topicAndPartitionsPerBroker.size,
+                                          brokerMap,
+                                          initialOffsetTime,
+                                          reportInterval)
+    // create all replica fetcher threads
+    val verificationBrokerId = topicAndPartitionsPerBroker.head._1
+    val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map {
+      case (brokerId, topicAndPartitions) =>
+        new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,
+                           sourceBroker = brokerMap(brokerId),
+                           topicAndPartitions = topicAndPartitions,
+                           replicaBuffer = replicaBuffer,
+                           socketTimeout = 30000,
+                           socketBufferSize = 256000,
+                           fetchSize = fetchSize,
+                           maxWait = maxWaitMs,
+                           minBytes = 1,
+                           doVerification = (brokerId == verificationBrokerId))
+    }
+
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        info("Stopping all fetchers")
+        fetcherThreads.foreach(_.shutdown())
+      }
+    })
+    fetcherThreads.foreach(_.start())
+    println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")
+
+  }
+}
+
+private case class TopicPartitionReplica(topic: String,  partitionId: Int,  replicaId: Int)
+
+private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset])
+
+private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
+
+private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int],
+                            leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
+                            expectedNumFetchers: Int,
+                            brokerMap: Map[Int, Broker],
+                            initialOffsetTime: Long,
+                            reportInterval: Long) extends Logging {
+  private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
+  private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]
+  private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
+  private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
+  @volatile private var lastReportTime = SystemTime.milliseconds
+  private var maxLag: Long = -1L
+  private var offsetWithMaxLag: Long = -1L
+  private var maxLagTopicAndPartition: TopicAndPartition = null
+  initialize()
+
+  def createNewFetcherBarrier() {
+    fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))
+  }
+
+  def getFetcherBarrier() = fetcherBarrier.get()
+
+  def createNewVerificationBarrier() {
+    verificationBarrier.set(new CountDownLatch(1))
+  }
+
+  def getVerificationBarrier() = verificationBarrier.get()
+
+  private def initialize() {
+    for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet)
+      messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData])
+    setInitialOffsets()
+  }
+
+  private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
+    offsetResponse.partitionErrorAndOffsets.filter {
+      case (topicAndPartition, partitionOffsetsResponse) => partitionOffsetsResponse.error != ErrorMapping.NoError
+    }.mkString
+  }
+
+  private def setInitialOffsets() {
+    for ((brokerId, topicAndPartitions) <- leadersPerBroker) {
+      val broker = brokerMap(brokerId)
+      val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId)
+      val initialOffsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] =
+        topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap
+      val offsetRequest = OffsetRequest(initialOffsetMap)
+      val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
+      assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse))
+      offsetResponse.partitionErrorAndOffsets.foreach{
+        case (topicAndPartition, partitionOffsetResponse) =>
+          fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
+      }
+    }
+  }
+
+  def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) {
+    messageSetCache.get(topicAndPartition).put(replicaId, partitionData)
+  }
+
+  def getOffset(topicAndPartition: TopicAndPartition) = {
+    fetchOffsetMap.get(topicAndPartition)
+  }
+
+  def verifyCheckSum() {
+    debug("Begin verification")
+    maxLag = -1L
+    for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {
+      debug("Verifying " + topicAndPartition)
+      assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
+            "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
+            + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
+      val messageIteratorMap = fetchResponsePerReplica.map {
+        case(replicaId, fetchResponse) =>
+          replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator}
+      val maxHw = fetchResponsePerReplica.values.map(_.hw).max
+
+      // Iterate one message at a time from every replica, until high watermark is reached.
+      var isMessageInAllReplicas = true
+      while (isMessageInAllReplicas) {
+        var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
+        for ( (replicaId, messageIterator) <- messageIteratorMap) {
+          if (messageIterator.hasNext) {
+            val messageAndOffset = messageIterator.next()
+
+            // only verify up to the high watermark
+            if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw)
+              isMessageInAllReplicas = false
+            else {
+              messageInfoFromFirstReplicaOpt match {
+                case None =>
+                  messageInfoFromFirstReplicaOpt = Some(
+                    MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum))
+                case Some(messageInfoFromFirstReplica) =>
+                  if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) {
+                    println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition
+                      + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
+                      + messageInfoFromFirstReplica.offset + " doesn't match replica "
+                      + replicaId + "'s offset " + messageAndOffset.offset)
+                    System.exit(1)
+                  }
+                  if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum)
+                    println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
+                      + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica "
+                      + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
+                      + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum)
+              }
+            }
+          } else
+            isMessageInAllReplicas = false
+        }
+        if (isMessageInAllReplicas) {
+          val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
+          fetchOffsetMap.put(topicAndPartition, nextOffset)
+          debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " +
+                nextOffset + " for " + topicAndPartition)
+        }
+      }
+      if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) {
+        offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition)
+        maxLag = maxHw - offsetWithMaxLag
+        maxLagTopicAndPartition = topicAndPartition
+      }
+      fetchResponsePerReplica.clear()
+    }
+    val currentTimeMs = SystemTime.milliseconds
+    if (currentTimeMs - lastReportTime > reportInterval) {
+      println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
+        + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
+        + " among " + messageSetCache.size + " paritions")
+      lastReportTime = currentTimeMs
+    }
+  }
+}
+
+private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition],
+                             replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
+                             fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean)
+  extends ShutdownableThread(name) {
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId)
+  val fetchRequestBuilder = new FetchRequestBuilder().
+          clientId(ReplicaVerificationTool.clientId).
+          replicaId(Request.DebuggingConsumerId).
+          maxWait(maxWait).
+          minBytes(minBytes)
+
+  override def doWork() {
+
+    val fetcherBarrier = replicaBuffer.getFetcherBarrier()
+    val verificationBarrier = replicaBuffer.getVerificationBarrier()
+
+    for (topicAndPartition <- topicAndPartitions)
+      fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+        replicaBuffer.getOffset(topicAndPartition), fetchSize)
+
+    val fetchRequest = fetchRequestBuilder.build()
+    debug("Issuing fetch request " + fetchRequest)
+
+    var response: FetchResponse = null
+    try {
+      response = simpleConsumer.fetch(fetchRequest)
+    } catch {
+      case t: Throwable =>
+        if (!isRunning.get)
+          throw t
+    }
+
+    if (response != null) {
+      response.data.foreach {
+        case(topicAndPartition, partitionData) =>
+          replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
+      }
+    } else {
+      for (topicAndPartition <- topicAndPartitions)
+        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty))
+    }
+
+    fetcherBarrier.countDown()
+    debug("Done fetching")
+
+    // wait for all fetchers to finish
+    fetcherBarrier.await()
+    debug("Ready for verification")
+
+    // one of the fetchers will do the verification
+    if (doVerification) {
+      debug("Do verification")
+      replicaBuffer.verifyCheckSum()
+      replicaBuffer.createNewFetcherBarrier()
+      replicaBuffer.createNewVerificationBarrier()
+      debug("Created new barrier")
+      verificationBarrier.countDown()
+    }
+
+    verificationBarrier.await()
+    debug("Done verification")
+  }
+}
\ No newline at end of file