You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2023/07/26 10:04:41 UTC

[kafka] branch trunk updated: KAFKA-14583: Move ReplicaVerificationTool to tools (#14059)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb677c4959e KAFKA-14583: Move ReplicaVerificationTool to tools (#14059)
bb677c4959e is described below

commit bb677c4959e18efbb3dae0b4bde123fca7c9ba33
Author: Federico Valeri <fe...@gmail.com>
AuthorDate: Wed Jul 26 12:04:34 2023 +0200

    KAFKA-14583: Move ReplicaVerificationTool to tools (#14059)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 bin/kafka-replica-verification.sh                  |   2 +-
 bin/windows/kafka-replica-verification.bat         |   2 +-
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/tools/ReplicaVerificationTool.scala      | 524 ---------------
 .../kafka/tools/ReplicaVerificationToolTest.scala  |  65 --
 .../services/replica_verification_tool.py          |  12 +-
 .../kafka/tools/ReplicaVerificationTool.java       | 744 +++++++++++++++++++++
 .../kafka/tools/ReplicaVerificationToolTest.java   |  72 ++
 8 files changed, 823 insertions(+), 600 deletions(-)

diff --git a/bin/kafka-replica-verification.sh b/bin/kafka-replica-verification.sh
index 4960836c0d0..1df563975a8 100755
--- a/bin/kafka-replica-verification.sh
+++ b/bin/kafka-replica-verification.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ReplicaVerificationTool "$@"
diff --git a/bin/windows/kafka-replica-verification.bat b/bin/windows/kafka-replica-verification.bat
index bf4805d7f64..a64d6f0ad75 100644
--- a/bin/windows/kafka-replica-verification.bat
+++ b/bin/windows/kafka-replica-verification.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 rem See the License for the specific language governing permissions and
 rem limitations under the License.
 
-"%~dp0kafka-run-class.bat" kafka.tools.ReplicaVerificationTool %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ReplicaVerificationTool %*
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a2b07913356..547a8e4b24a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -266,7 +266,7 @@
     <suppress checks="BooleanExpressionComplexity"
               files="StreamsResetter.java"/>
     <suppress checks="NPathComplexity"
-              files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier).java"/>
+              files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
     <suppress checks="ImportControl"
               files="SignalLogger.java"/>
     <suppress checks="IllegalImport"
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
deleted file mode 100644
index 3f66eaa8b7d..00000000000
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ /dev/null
@@ -1,524 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.apache.kafka.clients._
-import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, TopicDescription}
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.AbstractRequest.Builder
-import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest}
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.kafka.server.util.{CommandLineUtils, ShutdownableThread}
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
-
-import java.net.SocketTimeoutException
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import java.util.regex.{Pattern, PatternSyntaxException}
-import java.util.{Date, Optional, Properties}
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-import org.apache.kafka.server.util.TopicFilter.IncludeList
-
-/**
- * For verifying the consistency among replicas.
- *
- *  1. start a fetcher on every broker.
- *  2. each fetcher does the following
- *    2.1 issues fetch request
- *    2.2 puts the fetched result in a shared buffer
- *    2.3 waits for all other fetchers to finish step 2.2
- *    2.4 one of the fetchers verifies the consistency of fetched results among replicas
- *
- * The consistency verification is up to the high watermark. The tool reports the
- * max lag between the verified offset and the high watermark among all partitions.
- *
- * If a broker goes down, the verification of the partitions on that broker is delayed
- * until the broker is up again.
- *
- * Caveats:
- * 1. The tools needs all brokers to be up at startup time.
- * 2. The tool doesn't handle out of range offsets.
- */
-
-object ReplicaVerificationTool extends Logging {
-  val clientId = "replicaVerificationTool"
-  val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
-  val dateFormat = new SimpleDateFormat(dateFormatString)
-
-  def getCurrentTimeString() = {
-    ReplicaVerificationTool.dateFormat.format(new Date(Time.SYSTEM.milliseconds))
-  }
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser(false)
-    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
-                         .withRequiredArg
-                         .describedAs("hostname:port,...,hostname:port")
-                         .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
-                         .withRequiredArg
-                         .describedAs("bytes")
-                         .ofType(classOf[java.lang.Integer])
-                         .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
-    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
-                         .withRequiredArg
-                         .describedAs("ms")
-                         .ofType(classOf[java.lang.Integer])
-                         .defaultsTo(1000)
-    val topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; ignored if --topics-include specified. List of topics to verify replica consistency. Defaults to '.*' (all topics)")
-                         .withRequiredArg
-                         .describedAs("Java regex (String)")
-                         .ofType(classOf[String])
-                         .defaultsTo(".*")
-    val topicsIncludeOpt = parser.accepts("topics-include", "List of topics to verify replica consistency. Defaults to '.*' (all topics)")
-                        .withRequiredArg
-                        .describedAs("Java regex (String)")
-                        .ofType(classOf[String])
-                        .defaultsTo(".*")
-    val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
-                           .withRequiredArg
-                           .describedAs("timestamp/-1(latest)/-2(earliest)")
-                           .ofType(classOf[java.lang.Long])
-                           .defaultsTo(-1L)
-    val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
-                         .withRequiredArg
-                         .describedAs("ms")
-                         .ofType(classOf[java.lang.Long])
-                         .defaultsTo(30 * 1000L)
-    val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
-    val versionOpt = parser.accepts("version", "Print version information and exit.").forHelp()
-
-    val options = parser.parse(args: _*)
-
-    if (args.isEmpty || options.has(helpOpt)) {
-      CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas for a set of topics have the same data.")
-    }
-
-    if (options.has(versionOpt)) {
-      CommandLineUtils.printVersionAndExit()
-    }
-    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
-
-    val regex = if (options.has(topicsIncludeOpt))
-      options.valueOf(topicsIncludeOpt)
-    else
-      options.valueOf(topicWhiteListOpt)
-
-    val topicsIncludeFilter = new IncludeList(regex)
-
-    try Pattern.compile(regex)
-    catch {
-      case _: PatternSyntaxException =>
-        throw new RuntimeException(s"$regex is an invalid regex.")
-    }
-
-    val fetchSize = options.valueOf(fetchSizeOpt).intValue
-    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue
-    val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue
-    val reportInterval = options.valueOf(reportIntervalOpt).longValue
-    // getting topic metadata
-    info("Getting topic metadata...")
-    val brokerList = options.valueOf(brokerListOpt)
-    ToolsUtils.validatePortOrDie(parser, brokerList)
-
-    val (topicsMetadata, brokerInfo) = {
-      val adminClient = createAdminClient(brokerList)
-      try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
-      finally CoreUtils.swallow(adminClient.close(), this)
-    }
-
-    val topicIds = topicsMetadata.map( metadata => metadata.name() -> metadata.topicId()).toMap
-
-    val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
-      topicsIncludeFilter.isTopicAllowed(topicMetaData.name, false)
-    }
-
-    if (filteredTopicMetadata.isEmpty) {
-      error(s"No topics found. $topicsIncludeOpt if specified, is either filtering out all topics or there is no topic.")
-      Exit.exit(1)
-    }
-
-    val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
-      topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
-        partitionMetadata.replicas.asScala.map { node =>
-          TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
-        }
-      }
-    }
-    debug(s"Selected topic partitions: $topicPartitionReplicas")
-    val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
-      brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
-    }
-    debug(s"Topic partitions per broker: $brokerToTopicPartitions")
-    val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
-      new TopicPartition(replica.topic, replica.partitionId)
-    }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
-    debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")
-
-    val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
-      topicMetaData.partitions.asScala.map { partitionMetadata =>
-        new TopicPartition(topicMetaData.name, partitionMetadata.partition)
-      }
-    }
-
-    val consumerProps = consumerConfig(brokerList)
-
-    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
-      initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
-      brokerToTopicPartitions.size,
-      reportInterval)
-    // create all replica fetcher threads
-    val verificationBrokerId = brokerToTopicPartitions.head._1
-    val counter = new AtomicInteger(0)
-    val fetcherThreads = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
-      new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
-        sourceBroker = brokerInfo(brokerId),
-        topicPartitions = topicPartitions,
-        topicIds = topicIds,
-        replicaBuffer = replicaBuffer,
-        socketTimeout = 30000,
-        socketBufferSize = 256000,
-        fetchSize = fetchSize,
-        maxWait = maxWaitMs,
-        minBytes = 1,
-        doVerification = brokerId == verificationBrokerId,
-        consumerProps,
-        fetcherId = counter.incrementAndGet())
-    }
-
-    Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
-        info("Stopping all fetchers")
-        fetcherThreads.foreach(_.shutdown())
-    })
-    fetcherThreads.foreach(_.start())
-    println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")
-
-  }
-
-  private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = {
-    val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get
-    adminClient.describeTopics(topics).allTopicNames.get.values.asScala.toBuffer
-  }
-
-  private def brokerDetails(adminClient: Admin): Map[Int, Node] = {
-    adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap
-  }
-
-  private def createAdminClient(brokerUrl: String): Admin = {
-    val props = new Properties()
-    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-    Admin.create(props)
-  }
-
-  private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
-                             initialOffsetTime: Long): collection.Map[TopicPartition, Long] = {
-    val consumer = createConsumer(consumerConfig)
-    try {
-      if (ListOffsetsRequest.LATEST_TIMESTAMP == initialOffsetTime)
-        consumer.endOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
-      else if (ListOffsetsRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
-        consumer.beginningOffsets(topicPartitions.asJava).asScala.map { case (k, v) => k -> v.longValue }
-      else {
-        val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
-        consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.map { case (k, v) => k -> v.offset }
-      }
-    } finally consumer.close()
-  }
-
-  private def consumerConfig(brokerUrl: String): Properties = {
-    val properties = new Properties()
-    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification")
-    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
-    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
-    properties
-  }
-
-  private def createConsumer(consumerConfig: Properties): KafkaConsumer[String, String] =
-    new KafkaConsumer(consumerConfig)
-}
-
-private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int)
-
-private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
-
-private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[TopicPartition, Int],
-                            initialOffsets: collection.Map[TopicPartition, Long],
-                            expectedNumFetchers: Int,
-                            reportInterval: Long) extends Logging {
-  private val fetchOffsetMap = new Pool[TopicPartition, Long]
-  private val recordsCache = new Pool[TopicPartition, Pool[Int, FetchResponseData.PartitionData]]
-  private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
-  private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
-  @volatile private var lastReportTime = Time.SYSTEM.milliseconds
-  private var maxLag: Long = -1L
-  private var offsetWithMaxLag: Long = -1L
-  private var maxLagTopicAndPartition: TopicPartition = _
-  initialize()
-
-  def createNewFetcherBarrier(): Unit = {
-    fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))
-  }
-
-  def getFetcherBarrier() = fetcherBarrier.get
-
-  def createNewVerificationBarrier(): Unit = {
-    verificationBarrier.set(new CountDownLatch(1))
-  }
-
-  def getVerificationBarrier() = verificationBarrier.get
-
-  private def initialize(): Unit = {
-    for (topicPartition <- expectedReplicasPerTopicPartition.keySet)
-      recordsCache.put(topicPartition, new Pool[Int, FetchResponseData.PartitionData])
-    setInitialOffsets()
-  }
-
-
-  private def setInitialOffsets(): Unit = {
-    for ((tp, offset) <- initialOffsets)
-      fetchOffsetMap.put(tp, offset)
-  }
-
-  def addFetchedData(topicAndPartition: TopicPartition, replicaId: Int, partitionData: FetchResponseData.PartitionData): Unit = {
-    recordsCache.get(topicAndPartition).put(replicaId, partitionData)
-  }
-
-  def getOffset(topicAndPartition: TopicPartition) = {
-    fetchOffsetMap.get(topicAndPartition)
-  }
-
-  def verifyCheckSum(println: String => Unit): Unit = {
-    debug("Begin verification")
-    maxLag = -1L
-    for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
-      debug(s"Verifying $topicPartition")
-      assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
-        "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected "
-          + expectedReplicasPerTopicPartition(topicPartition) + " replicas")
-      val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
-        replicaId -> FetchResponse.recordsOrFail(fetchResponse).batches.iterator
-      }
-      val maxHw = fetchResponsePerReplica.values.map(_.highWatermark).max
-
-      // Iterate one message at a time from every replica, until high watermark is reached.
-      var isMessageInAllReplicas = true
-      while (isMessageInAllReplicas) {
-        var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
-        for ((replicaId, recordBatchIterator) <- recordBatchIteratorMap) {
-          try {
-            if (recordBatchIterator.hasNext) {
-              val batch = recordBatchIterator.next()
-
-              // only verify up to the high watermark
-              if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).highWatermark)
-                isMessageInAllReplicas = false
-              else {
-                messageInfoFromFirstReplicaOpt match {
-                  case None =>
-                    messageInfoFromFirstReplicaOpt = Some(
-                      MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
-                  case Some(messageInfoFromFirstReplica) =>
-                    if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
-                      println(ReplicaVerificationTool.getCurrentTimeString() + ": partition " + topicPartition
-                        + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
-                        + messageInfoFromFirstReplica.offset + " doesn't match replica "
-                        + replicaId + "'s offset " + batch.lastOffset)
-                      Exit.exit(1)
-                    }
-                    if (messageInfoFromFirstReplica.checksum != batch.checksum)
-                      println(ReplicaVerificationTool.getCurrentTimeString() + ": partition "
-                        + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
-                        + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
-                        + "; replica " + replicaId + "'s checksum " + batch.checksum)
-                }
-              }
-            } else
-              isMessageInAllReplicas = false
-          } catch {
-            case t: Throwable =>
-              throw new RuntimeException("Error in processing replica %d in partition %s at offset %d."
-                .format(replicaId, topicPartition, fetchOffsetMap.get(topicPartition)), t)
-          }
-        }
-        if (isMessageInAllReplicas) {
-          val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
-          fetchOffsetMap.put(topicPartition, nextOffset)
-          debug(s"${expectedReplicasPerTopicPartition(topicPartition)} replicas match at offset " +
-            s"$nextOffset for $topicPartition")
-        }
-      }
-      if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
-        offsetWithMaxLag = fetchOffsetMap.get(topicPartition)
-        maxLag = maxHw - offsetWithMaxLag
-        maxLagTopicAndPartition = topicPartition
-      }
-      fetchResponsePerReplica.clear()
-    }
-    val currentTimeMs = Time.SYSTEM.milliseconds
-    if (currentTimeMs - lastReportTime > reportInterval) {
-      println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
-        + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
-        + " among " + recordsCache.size + " partitions")
-      lastReportTime = currentTimeMs
-    }
-  }
-}
-
-private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: Iterable[TopicPartition],
-                             topicIds: Map[String, Uuid], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
-                             fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties,
-                             fetcherId: Int)
-  extends ShutdownableThread(name) with Logging {
-
-  this.logIdent = logPrefix
-
-  private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
-    s"broker-${FetchRequest.DEBUGGING_CONSUMER_ID}-fetcher-$fetcherId")
-
-  private val topicNames = topicIds.map(_.swap)
-
-  override def doWork(): Unit = {
-
-    val fetcherBarrier = replicaBuffer.getFetcherBarrier()
-    val verificationBarrier = replicaBuffer.getVerificationBarrier()
-
-    val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
-    for (topicPartition <- topicPartitions)
-      requestMap.put(topicPartition, new FetchRequest.PartitionData(topicIds.getOrElse(topicPartition.topic, Uuid.ZERO_UUID), replicaBuffer.getOffset(topicPartition),
-        0L, fetchSize, Optional.empty()))
-
-    val fetchRequestBuilder = FetchRequest.Builder.
-      forReplica(ApiKeys.FETCH.latestVersion, FetchRequest.DEBUGGING_CONSUMER_ID, -1, maxWait, minBytes, requestMap)
-
-    debug("Issuing fetch request ")
-
-    var fetchResponse: FetchResponse = null
-    try {
-      val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
-      fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
-    } catch {
-      case t: Throwable =>
-        if (!isRunning)
-          throw t
-    }
-
-    if (fetchResponse != null) {
-      fetchResponse.responseData(topicNames.asJava, ApiKeys.FETCH.latestVersion()).forEach { (tp, partitionData) =>
-        replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
-      }
-    } else {
-      for (topicAndPartition <- topicPartitions)
-        replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, FetchResponse.partitionResponse(topicAndPartition.partition, Errors.NONE))
-    }
-
-    fetcherBarrier.countDown()
-    debug("Done fetching")
-
-    // wait for all fetchers to finish
-    fetcherBarrier.await()
-    debug("Ready for verification")
-
-    // one of the fetchers will do the verification
-    if (doVerification) {
-      debug("Do verification")
-      replicaBuffer.verifyCheckSum(println)
-      replicaBuffer.createNewFetcherBarrier()
-      replicaBuffer.createNewVerificationBarrier()
-      debug("Created new barrier")
-      verificationBarrier.countDown()
-    }
-
-    verificationBarrier.await()
-    debug("Done verification")
-  }
-}
-
-private class ReplicaFetcherBlockingSend(sourceNode: Node,
-                                         consumerConfig: ConsumerConfig,
-                                         metrics: Metrics,
-                                         time: Time,
-                                         fetcherId: Int,
-                                         clientId: String) {
-
-  private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
-
-  private val networkClient = {
-    val logContext = new LogContext()
-    val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig, time, logContext)
-    val selector = new Selector(
-      NetworkReceive.UNLIMITED,
-      consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-      metrics,
-      time,
-      "replica-fetcher",
-      Map("broker-id" -> sourceNode.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
-      false,
-      channelBuilder,
-      logContext
-    )
-    new NetworkClient(
-      selector,
-      new ManualMetadataUpdater(),
-      clientId,
-      1,
-      0,
-      0,
-      Selectable.USE_DEFAULT_BUFFER_SIZE,
-      consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
-      consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-      consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
-      consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
-      time,
-      false,
-      new ApiVersions,
-      logContext
-    )
-  }
-
-  def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
-    try {
-      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
-        throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
-      else {
-        val clientRequest = networkClient.newClientRequest(sourceNode.id.toString, requestBuilder,
-          time.milliseconds(), true)
-        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
-      }
-    }
-    catch {
-      case e: Throwable =>
-        networkClient.close(sourceNode.id.toString)
-        throw e
-    }
-  }
-
-  def close(): Unit = {
-    networkClient.close()
-  }
-}
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
deleted file mode 100644
index 217260403df..00000000000
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions.assertTrue
-
-class ReplicaVerificationToolTest {
-
-  @Test
-  def testReplicaBufferVerifyChecksum(): Unit = {
-    val sb = new StringBuilder
-
-    val expectedReplicasPerTopicAndPartition = Map(
-      new TopicPartition("a", 0) -> 3,
-      new TopicPartition("a", 1) -> 3,
-      new TopicPartition("b", 0) -> 2
-    )
-
-    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, 0)
-    expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
-      (0 until numReplicas).foreach { replicaId =>
-        val records = (0 to 5).map { index =>
-          new SimpleRecord(s"key $index".getBytes, s"value $index".getBytes)
-        }
-        val initialOffset = 4
-        val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
-        val partitionData = new FetchResponseData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setHighWatermark(20)
-          .setLastStableOffset(20)
-          .setLogStartOffset(0)
-          .setRecords(memoryRecords)
-
-        replicaBuffer.addFetchedData(tp, replicaId, partitionData)
-      }
-    }
-
-    replicaBuffer.verifyCheckSum(line => sb.append(s"$line\n"))
-    val output = sb.toString.trim
-
-    // If you change this assertion, you should verify that the replica_verification_test.py system test still passes
-    assertTrue(output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"),
-      s"Max lag information should be in output: `$output`")
-  }
-
-}
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
index 13a1288001f..ecc47b2a6df 100644
--- a/tests/kafkatest/services/replica_verification_tool.py
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -71,15 +71,14 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
         return lag
 
     def start_cmd(self, node):
-        cmd = self.path.script("kafka-run-class.sh", node)
-        cmd += " %s" % self.java_class_name()
-        cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
+        cmd = self.path.script("kafka-replica-verification.sh", node)
+        cmd += " --broker-list %s --topics-include %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
 
         cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"
         return cmd
 
     def stop_node(self, node):
-        node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True,
+        node.account.kill_java_processes("ReplicaVerificationTool", clean_shutdown=True,
                                          allow_fail=True)
 
         stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
@@ -87,9 +86,6 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
                         (str(node.account), str(self.stop_timeout_sec))
 
     def clean_node(self, node):
-        node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False,
+        node.account.kill_java_processes("ReplicaVerificationTool", clean_shutdown=False,
                                          allow_fail=True)
         node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
-
-    def java_class_name(self):
-        return "kafka.tools.ReplicaVerificationTool"
diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
new file mode 100644
index 00000000000..446c5fd67bf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.TopicFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * For verifying the consistency among replicas.
+ * <p>
+ *  1. start a fetcher on every broker
+ *  2. each fetcher does the following
+ *    2.1 issues fetch request
+ *    2.2 puts the fetched result in a shared buffer
+ *    2.3 waits for all other fetchers to finish step 2.2
+ *    2.4 one of the fetchers verifies the consistency of fetched results among replicas
+ * <p>
+ * The consistency verification is up to the high watermark. The tool reports the
+ * max lag between the verified offset and the high watermark among all partitions.
+ * <p>
+ * If a broker goes down, the verification of the partitions on that broker is delayed
+ * until the broker is up again.
+ * <p>
+ * Caveats:
+ * 1. The tool needs all brokers to be up at startup time.
+ * 2. The tool doesn't handle out of range offsets.
+ */
+public class ReplicaVerificationTool {
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicaVerificationTool.class);
+    private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+    public static void main(String[] args) {
+        try {
+            ReplicaVerificationToolOptions options = new ReplicaVerificationToolOptions(args);
+            // getting topic metadata
+            LOG.info("Getting topic metadata...");
+            String brokerList = options.brokerHostsAndPorts();
+
+            try (Admin adminClient = createAdminClient(brokerList)) {
+                Collection<TopicDescription> topicsMetadata = listTopicsMetadata(adminClient);
+                Map<Integer, Node> brokerInfo = brokerDetails(adminClient);
+
+                Map<String, Uuid> topicIds = topicsMetadata.stream().collect(Collectors.toMap(TopicDescription::name, TopicDescription::topicId));
+
+                List<TopicDescription> filteredTopicMetadata = topicsMetadata.stream().filter(
+                    topicMetadata -> options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)
+                ).collect(Collectors.toList());
+
+                if (filteredTopicMetadata.isEmpty()) {
+                    LOG.error("No topics found. {} if specified, is either filtering out all topics or there is no topic.", options.topicsIncludeOpt);
+                    Exit.exit(1);
+                }
+
+                List<TopicPartitionReplica> topicPartitionReplicas = filteredTopicMetadata.stream().flatMap(
+                    topicMetadata -> topicMetadata.partitions().stream().flatMap(
+                        partitionMetadata -> partitionMetadata.replicas().stream().map(
+                            node -> new TopicPartitionReplica(topicMetadata.name(), partitionMetadata.partition(), node.id())
+                        )
+                    )
+                ).collect(Collectors.toList());
+                LOG.debug("Selected topic partitions: {}", topicPartitionReplicas);
+
+                Map<Integer, List<TopicPartition>> brokerToTopicPartitions = topicPartitionReplicas.stream()
+                    .collect(Collectors.groupingBy(
+                        TopicPartitionReplica::brokerId,
+                        Collectors.mapping(
+                            replica -> new TopicPartition(replica.topic(), replica.partition()),
+                            Collectors.toList()
+                        )
+                    ));
+                LOG.debug("Topic partitions per broker: {}", brokerToTopicPartitions);
+
+                Map<TopicPartition, Integer> expectedReplicasPerTopicPartition = topicPartitionReplicas.stream()
+                    .collect(Collectors.groupingBy(
+                        replica -> new TopicPartition(replica.topic(), replica.partition()),
+                        Collectors.collectingAndThen(
+                            Collectors.toList(),
+                            List::size
+                        )
+                    ));
+                LOG.debug("Expected replicas per topic partition: {}", expectedReplicasPerTopicPartition);
+
+                List<TopicPartition> topicPartitions = filteredTopicMetadata.stream()
+                    .flatMap(topicMetadata -> topicMetadata.partitions().stream()
+                        .map(partitionMetadata -> new TopicPartition(topicMetadata.name(), partitionMetadata.partition()))
+                    )
+                    .collect(Collectors.toList());
+
+                Properties consumerProps = consumerConfig(brokerList);
+
+                ReplicaBuffer replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
+                    initialOffsets(topicPartitions, consumerProps, options.initialOffsetTime()),
+                    brokerToTopicPartitions.size(), options.reportInterval());
+
+                // create all replica fetcher threads
+                int verificationBrokerId = brokerToTopicPartitions.entrySet().iterator().next().getKey();
+                AtomicInteger counter = new AtomicInteger(0);
+                List<ReplicaFetcher> fetcherThreads = brokerToTopicPartitions.entrySet().stream()
+                    .map(entry -> {
+                        int brokerId = entry.getKey();
+                        Iterable<TopicPartition> partitions = entry.getValue();
+                        return new ReplicaFetcher(
+                            "ReplicaFetcher-" + brokerId,
+                            brokerInfo.get(brokerId),
+                            partitions,
+                            topicIds,
+                            replicaBuffer,
+                            options.fetchSize(),
+                            options.maxWaitMs(),
+                            1,
+                            brokerId == verificationBrokerId,
+                            consumerProps,
+                            counter.incrementAndGet()
+                        );
+                    })
+                    .collect(Collectors.toList());
+
+                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                    LOG.info("Stopping all fetchers");
+                    fetcherThreads.forEach(replicaFetcher -> {
+                        try {
+                            replicaFetcher.shutdown();
+                        } catch (InterruptedException ignored) {
+                        }
+                    });
+                }, "ReplicaVerificationToolShutdownHook"));
+
+                fetcherThreads.forEach(Thread::start);
+                System.out.printf("%s: verification process is started%n",
+                    DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())));
+            }
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    private static Map<TopicPartition, Long> initialOffsets(List<TopicPartition> topicPartitions, Properties consumerConfig, long initialOffsetTime) {
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig)) {
+            if (ListOffsetsRequest.LATEST_TIMESTAMP == initialOffsetTime) {
+                return consumer.endOffsets(topicPartitions).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            } else if (ListOffsetsRequest.EARLIEST_TIMESTAMP == initialOffsetTime) {
+                return consumer.beginningOffsets(topicPartitions).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            } else {
+                Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
+                    .collect(Collectors.toMap(Function.identity(), tp -> initialOffsetTime));
+                return consumer.offsetsForTimes(timestampsToSearch).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+            }
+        }
+    }
+
+    private static Properties consumerConfig(String brokerUrl) {
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification");
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return properties;
+    }
+
+    private static Map<Integer, Node> brokerDetails(Admin adminClient) throws ExecutionException, InterruptedException {
+        return adminClient.describeCluster().nodes().get().stream().collect(Collectors.toMap(Node::id, Function.identity()));
+    }
+
+    private static Collection<TopicDescription> listTopicsMetadata(Admin adminClient) throws ExecutionException, InterruptedException {
+        Set<String> topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get();
+        return adminClient.describeTopics(topics).allTopicNames().get().values();
+    }
+
+    private static Admin createAdminClient(String brokerList) {
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        return Admin.create(props);
+    }
+
+    private static class ReplicaVerificationToolOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<Integer> fetchSizeOpt;
+        private final OptionSpec<Integer> maxWaitMsOpt;
+        private final OptionSpec<String> topicWhiteListOpt;
+        private final OptionSpec<String> topicsIncludeOpt;
+        private final OptionSpec<Long> initialOffsetTimeOpt;
+        private final OptionSpec<Long> reportIntervalOpt;
+
+        ReplicaVerificationToolOptions(String[] args) {
+            super(args);
+            brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
+                .withRequiredArg()
+                .describedAs("hostname:port,...,hostname:port")
+                .ofType(String.class);
+            fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.")
+                .withRequiredArg()
+                .describedAs("bytes")
+                .ofType(Integer.class)
+                .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES);
+            maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Integer.class)
+                .defaultsTo(1_000);
+            topicWhiteListOpt = parser.accepts("topic-white-list", "DEPRECATED use --topics-include instead; " +
+                    "ignored if --topics-include specified. List of topics to verify replica consistency.")
+                .withRequiredArg()
+                .describedAs("Java regex (String)")
+                .ofType(String.class)
+                .defaultsTo(".*");
+            topicsIncludeOpt = parser.accepts("topics-include", "List of topics to verify replica consistency.")
+                .withRequiredArg()
+                .describedAs("Java regex (String)")
+                .ofType(String.class)
+                .defaultsTo(".*");
+            initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")
+                .withRequiredArg()
+                .describedAs("timestamp/-1(latest)/-2(earliest)")
+                .ofType(Long.class)
+                .defaultsTo(-1L);
+            reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")
+                .withRequiredArg()
+                .describedAs("ms")
+                .ofType(Long.class)
+                .defaultsTo(30_000L);
+            options = parser.parse(args);
+            if (args.length == 0 || options.has(helpOpt)) {
+                CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas for a set of topics have the same data.");
+            }
+            if (options.has(versionOpt)) {
+                CommandLineUtils.printVersionAndExit();
+            }
+            CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt);
+        }
+
+        String brokerHostsAndPorts() {
+            String brokerList = options.valueOf(brokerListOpt);
+            validateBrokerList(parser, brokerList);
+            return brokerList;
+        }
+
+        void validateBrokerList(OptionParser parser, String brokerList) {
+            if (parser == null || brokerList == null) {
+                throw new RuntimeException("No option parser or broker list found");
+            }
+            if (brokerList.isEmpty()) {
+                CommandLineUtils.printUsageAndExit(parser, "Empty broker list option");
+            }
+
+            String[] hostPorts;
+            if (brokerList.contains(",")) hostPorts = brokerList.split(",");
+            else hostPorts = new String[]{brokerList};
+
+            String[] validHostPort = Arrays.stream(hostPorts)
+                .filter(hostPortData -> Utils.getPort(hostPortData) != null)
+                .toArray(String[]::new);
+
+            if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
+                CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option");
+            }
+        }
+
+        TopicFilter.IncludeList topicsIncludeFilter() {
+            String regex = options.valueOf(options.has(topicsIncludeOpt) ? topicsIncludeOpt : topicWhiteListOpt);
+            try {
+                Pattern.compile(regex);
+            } catch (PatternSyntaxException e) {
+                throw new RuntimeException(format("%s is an invalid regex", regex));
+            }
+            return new TopicFilter.IncludeList(regex);
+        }
+
+        int fetchSize() {
+            return options.valueOf(fetchSizeOpt);
+        }
+
+        int maxWaitMs() {
+            return options.valueOf(maxWaitMsOpt);
+        }
+
+        long initialOffsetTime() {
+            return options.valueOf(initialOffsetTimeOpt);
+        }
+
+        long reportInterval() {
+            return options.valueOf(reportIntervalOpt);
+        }
+    }
+
+    private static class MessageInfo {
+        final int replicaId;
+        final long offset;
+        final long nextOffset;
+        final long checksum;
+
+        MessageInfo(int replicaId, long offset, long nextOffset, long checksum) {
+            this.replicaId = replicaId;
+            this.offset = offset;
+            this.nextOffset = nextOffset;
+            this.checksum = checksum;
+        }
+    }
+
+    protected static class ReplicaBuffer {
+        private final Map<TopicPartition, Integer> expectedReplicasPerTopicPartition;
+        private final int expectedNumFetchers;
+        private final long reportInterval;
+        private final Map<TopicPartition, Long> fetchOffsetMap;
+        private final Map<TopicPartition, Map<Integer, FetchResponseData.PartitionData>> recordsCache;
+        private final AtomicReference<CountDownLatch> fetcherBarrier;
+        private final AtomicReference<CountDownLatch> verificationBarrier;
+
+        private volatile long lastReportTime;
+        private long maxLag;
+        private long offsetWithMaxLag;
+        private TopicPartition maxLagTopicAndPartition;
+
+        ReplicaBuffer(Map<TopicPartition, Integer> expectedReplicasPerTopicPartition,
+                      Map<TopicPartition, Long> initialOffsets,
+                      int expectedNumFetchers,
+                      long reportInterval) {
+            this.expectedReplicasPerTopicPartition = expectedReplicasPerTopicPartition;
+            this.expectedNumFetchers = expectedNumFetchers;
+            this.reportInterval = reportInterval;
+            this.fetchOffsetMap = new HashMap<>();
+            this.recordsCache = new HashMap<>();
+            this.fetcherBarrier = new AtomicReference<>(new CountDownLatch(expectedNumFetchers));
+            this.verificationBarrier = new AtomicReference<>(new CountDownLatch(1));
+            this.lastReportTime = Time.SYSTEM.milliseconds();
+            this.maxLag = -1L;
+            this.offsetWithMaxLag = -1L;
+
+            for (TopicPartition topicPartition : expectedReplicasPerTopicPartition.keySet()) {
+                recordsCache.put(topicPartition, new HashMap<>());
+            }
+            // set initial offsets
+            for (Map.Entry<TopicPartition, Long> entry : initialOffsets.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                Long offset = entry.getValue();
+                fetchOffsetMap.put(tp, offset);
+            }
+        }
+
+        void createNewFetcherBarrier() {
+            fetcherBarrier.set(new CountDownLatch(expectedNumFetchers));
+        }
+
+        CountDownLatch getFetcherBarrier() {
+            return fetcherBarrier.get();
+        }
+
+        void createNewVerificationBarrier() {
+            verificationBarrier.set(new CountDownLatch(1));
+        }
+
+        CountDownLatch getVerificationBarrier() {
+            return verificationBarrier.get();
+        }
+
+        void addFetchedData(TopicPartition topicPartition,
+                            int replicaId,
+                            FetchResponseData.PartitionData partitionData) {
+            recordsCache.get(topicPartition).put(replicaId, partitionData);
+        }
+
+        long getOffset(TopicPartition topicPartition) {
+            return fetchOffsetMap.get(topicPartition);
+        }
+
+        void verifyCheckSum(Consumer<String> println) {
+            LOG.debug("Begin verification");
+            maxLag = -1L;
+
+            for (Map.Entry<TopicPartition, Map<Integer, FetchResponseData.PartitionData>> cacheEntry : recordsCache.entrySet()) {
+                TopicPartition topicPartition = cacheEntry.getKey();
+                Map<Integer, FetchResponseData.PartitionData> fetchResponsePerReplica = cacheEntry.getValue();
+
+                LOG.debug("Verifying {}", topicPartition);
+                assert fetchResponsePerReplica.size() == expectedReplicasPerTopicPartition.get(topicPartition) :
+                    "fetched " + fetchResponsePerReplica.size() + " replicas for " + topicPartition +
+                        ", but expected " + expectedReplicasPerTopicPartition.get(topicPartition) + " replicas";
+
+                Map<Integer, Iterator<? extends RecordBatch>> recordBatchIteratorMap = new HashMap<>();
+                for (Map.Entry<Integer, FetchResponseData.PartitionData> fetchResEntry : fetchResponsePerReplica.entrySet()) {
+                    int replicaId = fetchResEntry.getKey();
+                    FetchResponseData.PartitionData fetchResponse = fetchResEntry.getValue();
+                    Iterator<? extends RecordBatch> recordIterator =
+                        FetchResponse.recordsOrFail(fetchResponse).batches().iterator();
+                    recordBatchIteratorMap.put(replicaId, recordIterator);
+                }
+
+                long maxHw = fetchResponsePerReplica.values().stream()
+                    .mapToLong(FetchResponseData.PartitionData::highWatermark)
+                    .max().orElse(-1L);
+
+                boolean isMessageInAllReplicas = true;
+
+                // iterate one message at a time from every replica, until high watermark is reached
+                while (isMessageInAllReplicas) {
+                    Optional<MessageInfo> messageInfoFromFirstReplicaOpt = Optional.empty();
+
+                    for (Map.Entry<Integer, Iterator<? extends RecordBatch>> batchEntry : recordBatchIteratorMap.entrySet()) {
+                        int replicaId = batchEntry.getKey();
+                        Iterator<? extends RecordBatch> recordBatchIterator = batchEntry.getValue();
+
+                        try {
+                            if (recordBatchIterator.hasNext()) {
+                                RecordBatch batch = recordBatchIterator.next();
+
+                                // only verify up to the high watermark
+                                if (batch.lastOffset() >= fetchResponsePerReplica.get(replicaId).highWatermark()) {
+                                    isMessageInAllReplicas = false;
+                                } else {
+                                    if (!messageInfoFromFirstReplicaOpt.isPresent()) {
+                                        messageInfoFromFirstReplicaOpt = Optional.of(
+                                            new MessageInfo(replicaId, batch.lastOffset(), batch.nextOffset(), batch.checksum())
+                                        );
+                                    } else {
+                                        MessageInfo messageInfoFromFirstReplica = messageInfoFromFirstReplicaOpt.get();
+
+                                        if (messageInfoFromFirstReplica.offset != batch.lastOffset()) {
+                                            println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) +
+                                                ": partition " + topicPartition +
+                                                ": replica " + messageInfoFromFirstReplica.replicaId +
+                                                "'s offset " + messageInfoFromFirstReplica.offset +
+                                                " doesn't match replica " + replicaId +
+                                                "'s offset " + batch.lastOffset());
+                                            Exit.exit(1);
+                                        }
+
+                                        if (messageInfoFromFirstReplica.checksum != batch.checksum())
+                                            println.accept(DATE_FORMAT.format(new Date(Time.SYSTEM.milliseconds())) +
+                                                ": partition " + topicPartition +
+                                                " has unmatched checksum at offset " + batch.lastOffset() +
+                                                "; replica " + messageInfoFromFirstReplica.replicaId +
+                                                "'s checksum " + messageInfoFromFirstReplica.checksum +
+                                                "; replica " + replicaId + "'s checksum " + batch.checksum());
+                                    }
+                                }
+                            } else {
+                                isMessageInAllReplicas = false;
+                            }
+                        } catch (Throwable t) {
+                            throw new RuntimeException("Error in processing replica " + replicaId +
+                                " in partition " + topicPartition + " at offset " +
+                                fetchOffsetMap.get(topicPartition), t);
+                        }
+                    }
+
+                    if (isMessageInAllReplicas) {
+                        long nextOffset = messageInfoFromFirstReplicaOpt.map(messageInfo -> messageInfo.nextOffset).orElse(-1L);
+                        fetchOffsetMap.put(topicPartition, nextOffset);
+                        LOG.debug("{} replicas match at offset {} for {}",
+                            expectedReplicasPerTopicPartition.get(topicPartition), nextOffset, topicPartition);
+                    }
+                }
+
+                if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
+                    offsetWithMaxLag = fetchOffsetMap.get(topicPartition);
+                    maxLag = maxHw - offsetWithMaxLag;
+                    maxLagTopicAndPartition = topicPartition;
+                }
+
+                fetchResponsePerReplica.clear();
+            }
+
+            long currentTimeMs = Time.SYSTEM.milliseconds();
+            if (currentTimeMs - lastReportTime > reportInterval) {
+                println.accept(DATE_FORMAT.format(new Date(currentTimeMs)) +
+                    ": max lag is " + maxLag + " for partition " +
+                    maxLagTopicAndPartition + " at offset " + offsetWithMaxLag +
+                    " among " + recordsCache.size() + " partitions");
+                lastReportTime = currentTimeMs;
+            }
+        }
+    }
+
+    private static class ReplicaFetcher extends ShutdownableThread {
+        private final Node sourceBroker;
+        private final Iterable<TopicPartition> topicPartitions;
+        private final Map<String, Uuid> topicIds;
+        private final ReplicaBuffer replicaBuffer;
+        private final int fetchSize;
+        private final int maxWait;
+        private final int minBytes;
+        private final boolean doVerification;
+        private final ReplicaFetcherBlockingSend fetchEndpoint;
+        private final Map<Uuid, String> topicNames;
+
+        public ReplicaFetcher(String name,
+                              Node sourceBroker,
+                              Iterable<TopicPartition> topicPartitions,
+                              Map<String, Uuid> topicIds,
+                              ReplicaBuffer replicaBuffer,
+                              int fetchSize,
+                              int maxWait,
+                              int minBytes,
+                              boolean doVerification,
+                              Properties consumerConfig,
+                              int fetcherId) {
+            super(name);
+            this.sourceBroker = sourceBroker;
+            this.topicPartitions = topicPartitions;
+            this.topicIds = topicIds;
+            this.replicaBuffer = replicaBuffer;
+            this.fetchSize = fetchSize;
+            this.maxWait = maxWait;
+            this.minBytes = minBytes;
+            this.doVerification = doVerification;
+            this.fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(),
+                Time.SYSTEM, fetcherId, "broker-" + FetchRequest.DEBUGGING_CONSUMER_ID + "-fetcher-" + fetcherId);
+            this.topicNames = topicIds.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
+        }
+
+        @Override
+        public void doWork() {
+            CountDownLatch fetcherBarrier = replicaBuffer.getFetcherBarrier();
+            CountDownLatch verificationBarrier = replicaBuffer.getVerificationBarrier();
+
+            Map<TopicPartition, FetchRequest.PartitionData> requestMap = new LinkedHashMap<>();
+            for (TopicPartition topicPartition : topicPartitions) {
+                requestMap.put(topicPartition, new FetchRequest.PartitionData(
+                    topicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID),
+                    replicaBuffer.getOffset(topicPartition),
+                    0L,
+                    fetchSize,
+                    Optional.empty()
+                ));
+            }
+
+            FetchRequest.Builder fetchRequestBuilder = FetchRequest.Builder.forReplica(
+                ApiKeys.FETCH.latestVersion(),
+                FetchRequest.DEBUGGING_CONSUMER_ID,
+                -1,
+                maxWait,
+                minBytes,
+                requestMap
+            );
+
+            LOG.debug("Issuing fetch request");
+
+            FetchResponse fetchResponse = null;
+            try {
+                ClientResponse clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder);
+                fetchResponse = (FetchResponse) clientResponse.responseBody();
+            } catch (Throwable t) {
+                if (!isRunning())
+                    throw new RuntimeException(t);
+            }
+
+            if (fetchResponse != null) {
+                fetchResponse.responseData(topicNames, ApiKeys.FETCH.latestVersion()).forEach((tp, partitionData) ->
+                    replicaBuffer.addFetchedData(tp, sourceBroker.id(), partitionData));
+            } else {
+                for (TopicPartition topicAndPartition : topicPartitions) {
+                    replicaBuffer.addFetchedData(
+                        topicAndPartition,
+                        sourceBroker.id(),
+                        FetchResponse.partitionResponse(topicAndPartition.partition(), Errors.NONE)
+                    );
+                }
+            }
+
+            fetcherBarrier.countDown();
+            LOG.debug("Done fetching");
+
+            // wait for all fetchers to finish
+            try {
+                fetcherBarrier.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+            LOG.debug("Ready for verification");
+
+            // one of the fetchers will do the verification
+            if (doVerification) {
+                LOG.debug("Do verification");
+                replicaBuffer.verifyCheckSum(System.out::println);
+                replicaBuffer.createNewFetcherBarrier();
+                replicaBuffer.createNewVerificationBarrier();
+                LOG.debug("Created new barrier");
+                verificationBarrier.countDown();
+            }
+
+            try {
+                verificationBarrier.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+            LOG.debug("Done verification");
+        }
+    }
+
+    private static class ReplicaFetcherBlockingSend {
+        private final Node sourceNode;
+        private final Time time;
+        private final int socketTimeout;
+        private final NetworkClient networkClient;
+
+        ReplicaFetcherBlockingSend(Node sourceNode,
+                                   ConsumerConfig consumerConfig,
+                                   Metrics metrics,
+                                   Time time,
+                                   int fetcherId,
+                                   String clientId) {
+            this.sourceNode = sourceNode;
+            this.time = time;
+            this.socketTimeout = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+            LogContext logContext = new LogContext();
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(consumerConfig, time, logContext);
+            Selector selector = new Selector(
+                NetworkReceive.UNLIMITED,
+                consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                metrics,
+                time,
+                "replica-fetcher",
+                new HashMap<String, String>() {{
+                        put("broker-id", sourceNode.idString());
+                        put("fetcher-id", String.valueOf(fetcherId));
+                    }},
+                false,
+                channelBuilder,
+                logContext
+            );
+            this.networkClient = new NetworkClient(
+                selector,
+                new ManualMetadataUpdater(),
+                clientId,
+                1,
+                0,
+                0,
+                Selectable.USE_DEFAULT_BUFFER_SIZE,
+                consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
+                consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
+                consumerConfig.getLong(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
+                time,
+                false,
+                new ApiVersions(),
+                logContext
+            );
+        }
+
+        ClientResponse sendRequest(AbstractRequest.Builder<? extends AbstractRequest> requestBuilder) {
+            try {
+                if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+                    throw new SocketTimeoutException("Failed to connect within " + socketTimeout + " ms");
+                else {
+                    ClientRequest clientRequest = networkClient.newClientRequest(sourceNode.idString(),
+                        requestBuilder, time.milliseconds(), true);
+                    return NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time);
+                }
+            } catch (Throwable e) {
+                networkClient.close(sourceNode.idString());
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java b/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java
new file mode 100644
index 00000000000..b5eb121ba13
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ReplicaVerificationToolTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReplicaVerificationToolTest {
+    @Test
+    void testReplicaBufferVerifyChecksum() {
+        StringBuilder sb = new StringBuilder();
+        final Map<TopicPartition, Integer> expectedReplicasPerTopicAndPartition = new HashMap<TopicPartition, Integer>() {{
+                put(new TopicPartition("a", 0), 3);
+                put(new TopicPartition("a", 1), 3);
+                put(new TopicPartition("b", 0), 2);
+            }};
+
+        ReplicaVerificationTool.ReplicaBuffer replicaBuffer =
+            new ReplicaVerificationTool.ReplicaBuffer(expectedReplicasPerTopicAndPartition, Collections.emptyMap(), 2, 0);
+        expectedReplicasPerTopicAndPartition.forEach((tp, numReplicas) -> {
+            IntStream.range(0, numReplicas).forEach(replicaId -> {
+                SimpleRecord[] records = IntStream.rangeClosed(0, 5)
+                    .mapToObj(index -> new SimpleRecord(("key " + index).getBytes(), ("value " + index).getBytes()))
+                    .toArray(SimpleRecord[]::new);
+
+                long initialOffset = 4L;
+                MemoryRecords memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records);
+                FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
+                    .setPartitionIndex(tp.partition())
+                    .setHighWatermark(20)
+                    .setLastStableOffset(20)
+                    .setLogStartOffset(0)
+                    .setRecords(memoryRecords);
+
+                replicaBuffer.addFetchedData(tp, replicaId, partitionData);
+            });
+        });
+
+        replicaBuffer.verifyCheckSum(line -> sb.append(format("%s%n", line)));
+        String output = sb.toString().trim();
+
+        // if you change this assertion, you should verify that the replica_verification_test.py system test still passes
+        assertTrue(output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"),
+            format("Max lag information should be in output: %s", output));
+    }
+}