You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/12 07:47:45 UTC

[kafka] branch 2.0 updated: KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 35f3470  KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188)
35f3470 is described below

commit 35f34709ba865a38775308e17f97557e8cd12f69
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Jun 12 13:16:24 2018 +0530

    KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188)
    
    We need to send fetch requests to replicas so we have to use
    NetworkClient instead of KafkaConsumer.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/server/ReplicaFetcherBlockingSend.scala  |   2 +-
 .../kafka/tools/ReplicaVerificationTool.scala      | 389 +++++++++++++--------
 .../kafka/tools/ReplicaVerificationToolTest.scala  |  22 +-
 3 files changed, 255 insertions(+), 158 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 0bf2bd3..4c7adfb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -86,7 +86,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
     )
   }
 
-  override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse =  {
+  override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
     try {
       if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
         throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 0408e92..b1e6946 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -17,25 +17,35 @@
 
 package kafka.tools
 
+import java.net.SocketTimeoutException
 import java.text.SimpleDateFormat
-import java.util.Date
+import java.util
 import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import java.util.regex.{Pattern, PatternSyntaxException}
+import java.util.{Date, Properties}
 
 import joptsimple.OptionParser
 import kafka.api._
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.common.TopicAndPartition
-import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist}
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.consumer.Whitelist
 import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.AbstractRequest.Builder
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, FetchRequest => JFetchRequest}
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{Node, TopicPartition}
+
+import scala.collection.JavaConverters._
 
 /**
- *  For verifying the consistency among replicas.
+ * For verifying the consistency among replicas.
  *
  *  1. start a fetcher on every broker.
  *  2. each fetcher does the following
@@ -44,11 +54,11 @@ import org.apache.kafka.common.utils.Time
  *    2.3 waits for all other fetchers to finish step 2.2
  *    2.4 one of the fetchers verifies the consistency of fetched results among replicas
  *
- *  The consistency verification is up to the high watermark. The tool reports the
- *  max lag between the verified offset and the high watermark among all partitions.
+ * The consistency verification is up to the high watermark. The tool reports the
+ * max lag between the verified offset and the high watermark among all partitions.
  *
- *  If a broker goes down, the verification of the partitions on that broker is delayed
- *  until the broker is up again.
+ * If a broker goes down, the verification of the partitions on that broker is delayed
+ * until the broker is up again.
  *
  * Caveats:
  * 1. The tools needs all brokers to be up at startup time.
@@ -56,7 +66,7 @@ import org.apache.kafka.common.utils.Time
  */
 
 object ReplicaVerificationTool extends Logging {
-  val clientId= "replicaVerificationTool"
+  val clientId = "replicaVerificationTool"
   val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
   val dateFormat = new SimpleDateFormat(dateFormatString)
 
@@ -74,7 +84,7 @@ object ReplicaVerificationTool extends Logging {
                          .withRequiredArg
                          .describedAs("bytes")
                          .ofType(classOf[java.lang.Integer])
-                         .defaultsTo(ConsumerConfig.FetchSize)
+                         .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
     val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
                          .withRequiredArg
                          .describedAs("ms")
@@ -96,18 +106,16 @@ object ReplicaVerificationTool extends Logging {
                          .ofType(classOf[java.lang.Long])
                          .defaultsTo(30 * 1000L)
 
-   if(args.length == 0)
+    if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
 
-    val options = parser.parse(args : _*)
+    val options = parser.parse(args: _*)
     CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
 
     val regex = options.valueOf(topicWhiteListOpt)
     val topicWhiteListFiler = new Whitelist(regex)
 
-    try {
-      Pattern.compile(regex)
-    }
+    try Pattern.compile(regex)
     catch {
       case _: PatternSyntaxException =>
         throw new RuntimeException(regex + " is an invalid regex.")
@@ -120,68 +128,68 @@ object ReplicaVerificationTool extends Logging {
     // getting topic metadata
     info("Getting topic metadata...")
     val brokerList = options.valueOf(brokerListOpt)
-    ToolsUtils.validatePortOrDie(parser,brokerList)
-    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
-    val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
-    val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
-    val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
-        topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
-          true
-        else
-          false
-    )
+    ToolsUtils.validatePortOrDie(parser, brokerList)
+
+    val (topicsMetadata, brokerInfo) = {
+      val adminClient = createAdminClient(brokerList)
+      try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
+      finally CoreUtils.swallow(adminClient.close(), this)
+    }
+
+    val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
+      topicWhiteListFiler.isTopicAllowed(topicMetaData.name, excludeInternalTopics = false)
+    }
 
     if (filteredTopicMetadata.isEmpty) {
-      error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
+      error(s"No topics found. $topicWhiteListOpt if specified, is either filtering out all topics or there is no topic.")
       Exit.exit(1)
     }
 
-    val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
-      topicMetadataResponse =>
-        topicMetadataResponse.partitionsMetadata.flatMap(
-          partitionMetadata =>
-            partitionMetadata.replicas.map(broker =>
-              TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id))
-        )
-    )
-    debug("Selected topic partitions: " + topicPartitionReplicaList)
-    val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId)
-      .map { case (brokerId, partitions) =>
-               brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } }
-    debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
-    val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
-          topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId))
-          .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
-    debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
-    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
-      topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
-        (TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+    val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
+      topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
+        partitionMetadata.replicas.asScala.map { node =>
+          TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
+        }
+      }
+    }
+    debug(s"Selected topic partitions: $topicPartitionReplicas")
+    val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
+      brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
+    }
+    debug(s"Topic partitions per broker: $brokerToTopicPartitions")
+    val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
+      new TopicPartition(replica.topic, replica.partitionId)
+    }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
+    debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")
+
+    val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
+      topicMetaData.partitions.asScala.map { partitionMetadata =>
+        new TopicPartition(topicMetaData.name, partitionMetadata.partition)
       }
-    }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
-       topicAndPartition
-     })
-    debug("Leaders per broker: " + leadersPerBroker)
-
-    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
-                                          leadersPerBroker,
-                                          topicAndPartitionsPerBroker.size,
-                                          brokerMap,
-                                          initialOffsetTime,
-                                          reportInterval)
+    }
+
+    val consumerProps = consumerConfig(brokerList)
+
+    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
+      initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
+      brokerToTopicPartitions.size,
+      reportInterval)
     // create all replica fetcher threads
-    val verificationBrokerId = topicAndPartitionsPerBroker.head._1
-    val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map {
-      case (brokerId, topicAndPartitions) =>
-        new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,
-                           sourceBroker = brokerMap(brokerId),
-                           topicAndPartitions = topicAndPartitions,
-                           replicaBuffer = replicaBuffer,
-                           socketTimeout = 30000,
-                           socketBufferSize = 256000,
-                           fetchSize = fetchSize,
-                           maxWait = maxWaitMs,
-                           minBytes = 1,
-                           doVerification = brokerId == verificationBrokerId)
+    val verificationBrokerId = brokerToTopicPartitions.head._1
+    val counter = new AtomicInteger(0)
+    val fetcherThreads: Iterable[ReplicaFetcher] = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
+      new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
+        sourceBroker = brokerInfo(brokerId),
+        topicPartitions = topicPartitions,
+        replicaBuffer = replicaBuffer,
+        socketTimeout = 30000,
+        socketBufferSize = 256000,
+        fetchSize = fetchSize,
+        maxWait = maxWaitMs,
+        minBytes = 1,
+        doVerification = brokerId == verificationBrokerId,
+        consumerProps,
+        fetcherId = counter.incrementAndGet())
     }
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -194,87 +202,112 @@ object ReplicaVerificationTool extends Logging {
     println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")
 
   }
+
+  private def listTopicsMetadata(adminClient: admin.AdminClient): Seq[TopicDescription] = {
+    val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get
+    adminClient.describeTopics(topics).all.get.values.asScala.toBuffer
+  }
+
+  private def brokerDetails(adminClient: admin.AdminClient): Map[Int, Node] = {
+    adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap
+  }
+
+  private def createAdminClient(brokerUrl: String): admin.AdminClient = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    admin.AdminClient.create(props)
+  }
+
+  private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
+                             initialOffsetTime: Long): Map[TopicPartition, Long] = {
+    val consumer = createConsumer(consumerConfig)
+    try {
+      if (ListOffsetRequest.LATEST_TIMESTAMP == initialOffsetTime)
+        consumer.endOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
+      else if (ListOffsetRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
+        consumer.beginningOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
+      else {
+        val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
+        consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.mapValues(v => v.offset).toMap
+      }
+    } finally consumer.close()
+  }
+
+  private def consumerConfig(brokerUrl: String): Properties = {
+    val properties = new Properties()
+    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification")
+    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    properties
+  }
+
+  private def createConsumer(consumerConfig: Properties): KafkaConsumer[String, String] =
+    new KafkaConsumer(consumerConfig)
 }
 
-private case class TopicPartitionReplica(topic: String,  partitionId: Int,  replicaId: Int)
+private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int)
 
 private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
 
-private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int],
-                            leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
+private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartition, Int],
+                            initialOffsets: Map[TopicPartition, Long],
                             expectedNumFetchers: Int,
-                            brokerMap: Map[Int, BrokerEndPoint],
-                            initialOffsetTime: Long,
                             reportInterval: Long) extends Logging {
-  private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
-  private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]
+  private val fetchOffsetMap = new Pool[TopicPartition, Long]
+  private val recordsCache = new Pool[TopicPartition, Pool[Int, FetchResponse.PartitionData[MemoryRecords]]]
   private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
   private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
   @volatile private var lastReportTime = Time.SYSTEM.milliseconds
   private var maxLag: Long = -1L
   private var offsetWithMaxLag: Long = -1L
-  private var maxLagTopicAndPartition: TopicAndPartition = null
+  private var maxLagTopicAndPartition: TopicPartition = null
   initialize()
 
   def createNewFetcherBarrier() {
     fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))
   }
 
-  def getFetcherBarrier() = fetcherBarrier.get()
+  def getFetcherBarrier() = fetcherBarrier.get
 
   def createNewVerificationBarrier() {
     verificationBarrier.set(new CountDownLatch(1))
   }
 
-  def getVerificationBarrier() = verificationBarrier.get()
+  def getVerificationBarrier() = verificationBarrier.get
 
   private def initialize() {
-    for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet)
-      messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData])
+    for (topicPartition <- expectedReplicasPerTopicPartition.keySet)
+      recordsCache.put(topicPartition, new Pool[Int, FetchResponse.PartitionData[MemoryRecords]])
     setInitialOffsets()
   }
 
-  private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
-    offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
-      partitionOffsetsResponse.error != Errors.NONE
-    }.mkString
-  }
 
   private def setInitialOffsets() {
-    for ((brokerId, topicAndPartitions) <- leadersPerBroker) {
-      val broker = brokerMap(brokerId)
-      val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId)
-      val initialOffsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] =
-        topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap
-      val offsetRequest = OffsetRequest(initialOffsetMap)
-      val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
-      assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse))
-      offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) =>
-        fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
-      }
-    }
+    for ((tp, offset) <- initialOffsets)
+      fetchOffsetMap.put(tp, offset)
   }
 
-  def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) {
-    messageSetCache.get(topicAndPartition).put(replicaId, partitionData)
+  def addFetchedData(topicAndPartition: TopicPartition, replicaId: Int, partitionData: FetchResponse.PartitionData[MemoryRecords]) {
+    recordsCache.get(topicAndPartition).put(replicaId, partitionData)
   }
 
-  def getOffset(topicAndPartition: TopicAndPartition) = {
+  def getOffset(topicAndPartition: TopicPartition) = {
     fetchOffsetMap.get(topicAndPartition)
   }
 
   def verifyCheckSum(println: String => Unit) {
     debug("Begin verification")
     maxLag = -1L
-    for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {
-      debug("Verifying " + topicAndPartition)
-      assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
-            "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
-            + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
+    for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
+      debug("Verifying " + topicPartition)
+      assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
+        "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected "
+          + expectedReplicasPerTopicPartition(topicPartition) + " replicas")
       val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
-        replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.batches.iterator
+        replicaId -> fetchResponse.records.batches.iterator
       }
-      val maxHw = fetchResponsePerReplica.values.map(_.hw).max
+      val maxHw = fetchResponsePerReplica.values.map(_.highWatermark).max
 
       // Iterate one message at a time from every replica, until high watermark is reached.
       var isMessageInAllReplicas = true
@@ -286,7 +319,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
               val batch = recordBatchIterator.next()
 
               // only verify up to the high watermark
-              if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).hw)
+              if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).highWatermark)
                 isMessageInAllReplicas = false
               else {
                 messageInfoFromFirstReplicaOpt match {
@@ -295,7 +328,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
                       MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
                   case Some(messageInfoFromFirstReplica) =>
                     if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
-                      println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition
+                      println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicPartition
                         + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
                         + messageInfoFromFirstReplica.offset + " doesn't match replica "
                         + replicaId + "'s offset " + batch.lastOffset)
@@ -303,7 +336,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
                     }
                     if (messageInfoFromFirstReplica.checksum != batch.checksum)
                       println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
-                        + topicAndPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
+                        + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
                         + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
                         + "; replica " + replicaId + "'s checksum " + batch.checksum)
                 }
@@ -313,20 +346,20 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
           } catch {
             case t: Throwable =>
               throw new RuntimeException("Error in processing replica %d in partition %s at offset %d."
-              .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t)
+                .format(replicaId, topicPartition, fetchOffsetMap.get(topicPartition)), t)
           }
         }
         if (isMessageInAllReplicas) {
           val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
-          fetchOffsetMap.put(topicAndPartition, nextOffset)
-          debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " +
-                nextOffset + " for " + topicAndPartition)
+          fetchOffsetMap.put(topicPartition, nextOffset)
+          debug(expectedReplicasPerTopicPartition(topicPartition) + " replicas match at offset " +
+            nextOffset + " for " + topicPartition)
         }
       }
-      if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) {
-        offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition)
+      if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
+        offsetWithMaxLag = fetchOffsetMap.get(topicPartition)
         maxLag = maxHw - offsetWithMaxLag
-        maxLagTopicAndPartition = topicAndPartition
+        maxLagTopicAndPartition = topicPartition
       }
       fetchResponsePerReplica.clear()
     }
@@ -334,51 +367,54 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
     if (currentTimeMs - lastReportTime > reportInterval) {
       println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
         + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
-        + " among " + messageSetCache.size + " partitions")
+        + " among " + recordsCache.size + " partitions")
       lastReportTime = currentTimeMs
     }
   }
 }
 
-private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAndPartitions: Iterable[TopicAndPartition],
+private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: Iterable[TopicPartition],
                              replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
-                             fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean)
+                             fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties,
+                             fetcherId: Int)
   extends ShutdownableThread(name) {
-  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId)
-  val fetchRequestBuilder = new FetchRequestBuilder().
-          clientId(ReplicaVerificationTool.clientId).
-          replicaId(Request.DebuggingConsumerId).
-          maxWait(maxWait).
-          minBytes(minBytes)
+
+  private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
+    s"broker-${Request.DebuggingConsumerId}-fetcher-$fetcherId")
 
   override def doWork() {
 
     val fetcherBarrier = replicaBuffer.getFetcherBarrier()
     val verificationBarrier = replicaBuffer.getVerificationBarrier()
 
-    for (topicAndPartition <- topicAndPartitions)
-      fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
-        replicaBuffer.getOffset(topicAndPartition), fetchSize)
+    val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
+    for (topicPartition <- topicPartitions)
+      requestMap.put(topicPartition, new JFetchRequest.PartitionData(replicaBuffer.getOffset(topicPartition), 0L, fetchSize))
+
+    val fetchRequestBuilder = JFetchRequest.Builder.
+      forReplica(ApiKeys.FETCH.latestVersion, Request.DebuggingConsumerId, maxWait, minBytes, requestMap)
 
-    val fetchRequest = fetchRequestBuilder.build()
-    debug("Issuing fetch request " + fetchRequest)
+    debug("Issuing fetch request ")
 
-    var response: FetchResponse = null
+    var fetchResponse: FetchResponse[MemoryRecords] = null
     try {
-      response = simpleConsumer.fetch(fetchRequest)
+      val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
+      fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[MemoryRecords]]
     } catch {
       case t: Throwable =>
         if (!isRunning)
           throw t
     }
 
-    if (response != null) {
-      response.data.foreach { case (topicAndPartition, partitionData) =>
-        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
+    if (fetchResponse != null) {
+      fetchResponse.responseData.asScala.foreach { case (tp, partitionData) =>
+        replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
       }
     } else {
-      for (topicAndPartition <- topicAndPartitions)
-        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty))
+      val emptyResponse = new FetchResponse.PartitionData(Errors.NONE, FetchResponse.INVALID_HIGHWATERMARK,
+        FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+      for (topicAndPartition <- topicPartitions)
+        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, emptyResponse)
     }
 
     fetcherBarrier.countDown()
@@ -402,3 +438,64 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
     debug("Done verification")
   }
 }
+
+private class ReplicaFetcherBlockingSend(sourceNode: Node,
+                                         consumerConfig: ConsumerConfig,
+                                         metrics: Metrics,
+                                         time: Time,
+                                         fetcherId: Int,
+                                         clientId: String) {
+
+  private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
+
+  private val networkClient = {
+    val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig)
+    val selector = new Selector(
+      NetworkReceive.UNLIMITED,
+      consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+      metrics,
+      time,
+      "replica-fetcher",
+      Map("broker-id" -> sourceNode.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
+      false,
+      channelBuilder,
+      new LogContext
+    )
+    new NetworkClient(
+      selector,
+      new ManualMetadataUpdater(),
+      clientId,
+      1,
+      0,
+      0,
+      Selectable.USE_DEFAULT_BUFFER_SIZE,
+      consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
+      consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+      time,
+      false,
+      new ApiVersions,
+      new LogContext
+    )
+  }
+
+  def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+    try {
+      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+        throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
+      else {
+        val clientRequest = networkClient.newClientRequest(sourceNode.id.toString, requestBuilder,
+          time.milliseconds(), true)
+        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
+      }
+    }
+    catch {
+      case e: Throwable =>
+        networkClient.close(sourceNode.id.toString)
+        throw e
+    }
+  }
+
+  def close(): Unit = {
+    networkClient.close()
+  }
+}
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index 211413a..f69c909 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -17,11 +17,10 @@
 
 package kafka.tools
 
-import kafka.api.FetchResponsePartitionData
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.FetchResponse
 import org.junit.Test
 import org.junit.Assert.assertTrue
 
@@ -32,12 +31,12 @@ class ReplicaVerificationToolTest {
     val sb = new StringBuilder
 
     val expectedReplicasPerTopicAndPartition = Map(
-      TopicAndPartition("a", 0) -> 3,
-      TopicAndPartition("a", 1) -> 3,
-      TopicAndPartition("b", 0) -> 2
+      new TopicPartition("a", 0) -> 3,
+      new TopicPartition("a", 1) -> 3,
+      new TopicPartition("b", 0) -> 2
     )
 
-    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0)
+    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, 0)
     expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
       (0 until numReplicas).foreach { replicaId =>
         val records = (0 to 5).map { index =>
@@ -45,8 +44,9 @@ class ReplicaVerificationToolTest {
         }
         val initialOffset = 4
         val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
-        replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20,
-          new ByteBufferMessageSet(memoryRecords.buffer)))
+        val partitionData = new FetchResponse.PartitionData(Errors.NONE, 20, 20, 0L, null, memoryRecords)
+
+        replicaBuffer.addFetchedData(tp, replicaId, partitionData)
       }
     }
 
@@ -55,7 +55,7 @@ class ReplicaVerificationToolTest {
 
     // If you change this assertion, you should verify that the replica_verification_test.py system test still passes
     assertTrue(s"Max lag information should be in output: `$output`",
-      output.endsWith(": max lag is 10 for partition a-0 at offset 10 among 3 partitions"))
+      output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"))
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.