You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/12 07:47:45 UTC
[kafka] branch 2.0 updated: KAFKA-7029: Update
ReplicaVerificationTool not to use SimpleConsumer (#5188)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 35f3470 KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188)
35f3470 is described below
commit 35f34709ba865a38775308e17f97557e8cd12f69
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Tue Jun 12 13:16:24 2018 +0530
KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188)
We need to send fetch requests to replicas so we have to use
NetworkClient instead of KafkaConsumer.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/server/ReplicaFetcherBlockingSend.scala | 2 +-
.../kafka/tools/ReplicaVerificationTool.scala | 389 +++++++++++++--------
.../kafka/tools/ReplicaVerificationToolTest.scala | 22 +-
3 files changed, 255 insertions(+), 158 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 0bf2bd3..4c7adfb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -86,7 +86,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
)
}
- override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+ override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
try {
if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 0408e92..b1e6946 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -17,25 +17,35 @@
package kafka.tools
+import java.net.SocketTimeoutException
import java.text.SimpleDateFormat
-import java.util.Date
+import java.util
import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.regex.{Pattern, PatternSyntaxException}
+import java.util.{Date, Properties}
import joptsimple.OptionParser
import kafka.api._
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.common.TopicAndPartition
-import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist}
-import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.consumer.Whitelist
import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.AbstractRequest.Builder
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, FetchRequest => JFetchRequest}
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{Node, TopicPartition}
+
+import scala.collection.JavaConverters._
/**
- * For verifying the consistency among replicas.
+ * For verifying the consistency among replicas.
*
* 1. start a fetcher on every broker.
* 2. each fetcher does the following
@@ -44,11 +54,11 @@ import org.apache.kafka.common.utils.Time
* 2.3 waits for all other fetchers to finish step 2.2
* 2.4 one of the fetchers verifies the consistency of fetched results among replicas
*
- * The consistency verification is up to the high watermark. The tool reports the
- * max lag between the verified offset and the high watermark among all partitions.
+ * The consistency verification is up to the high watermark. The tool reports the
+ * max lag between the verified offset and the high watermark among all partitions.
*
- * If a broker goes down, the verification of the partitions on that broker is delayed
- * until the broker is up again.
+ * If a broker goes down, the verification of the partitions on that broker is delayed
+ * until the broker is up again.
*
* Caveats:
* 1. The tools needs all brokers to be up at startup time.
@@ -56,7 +66,7 @@ import org.apache.kafka.common.utils.Time
*/
object ReplicaVerificationTool extends Logging {
- val clientId= "replicaVerificationTool"
+ val clientId = "replicaVerificationTool"
val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
val dateFormat = new SimpleDateFormat(dateFormatString)
@@ -74,7 +84,7 @@ object ReplicaVerificationTool extends Logging {
.withRequiredArg
.describedAs("bytes")
.ofType(classOf[java.lang.Integer])
- .defaultsTo(ConsumerConfig.FetchSize)
+ .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
@@ -96,18 +106,16 @@ object ReplicaVerificationTool extends Logging {
.ofType(classOf[java.lang.Long])
.defaultsTo(30 * 1000L)
- if(args.length == 0)
+ if (args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
- val options = parser.parse(args : _*)
+ val options = parser.parse(args: _*)
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
val regex = options.valueOf(topicWhiteListOpt)
val topicWhiteListFiler = new Whitelist(regex)
- try {
- Pattern.compile(regex)
- }
+ try Pattern.compile(regex)
catch {
case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
@@ -120,68 +128,68 @@ object ReplicaVerificationTool extends Logging {
// getting topic metadata
info("Getting topic metadata...")
val brokerList = options.valueOf(brokerListOpt)
- ToolsUtils.validatePortOrDie(parser,brokerList)
- val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
- val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
- val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
- val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
- topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
- true
- else
- false
- )
+ ToolsUtils.validatePortOrDie(parser, brokerList)
+
+ val (topicsMetadata, brokerInfo) = {
+ val adminClient = createAdminClient(brokerList)
+ try ((listTopicsMetadata(adminClient), brokerDetails(adminClient)))
+ finally CoreUtils.swallow(adminClient.close(), this)
+ }
+
+ val filteredTopicMetadata = topicsMetadata.filter { topicMetaData =>
+ topicWhiteListFiler.isTopicAllowed(topicMetaData.name, excludeInternalTopics = false)
+ }
if (filteredTopicMetadata.isEmpty) {
- error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
+ error(s"No topics found. $topicWhiteListOpt if specified, is either filtering out all topics or there is no topic.")
Exit.exit(1)
}
- val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
- topicMetadataResponse =>
- topicMetadataResponse.partitionsMetadata.flatMap(
- partitionMetadata =>
- partitionMetadata.replicas.map(broker =>
- TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id))
- )
- )
- debug("Selected topic partitions: " + topicPartitionReplicaList)
- val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId)
- .map { case (brokerId, partitions) =>
- brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } }
- debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)
- val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =
- topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId))
- .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
- debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)
- val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse =>
- topicMetadataResponse.partitionsMetadata.map { partitionMetadata =>
- (TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)
+ val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata =>
+ topicMetadata.partitions.asScala.flatMap { partitionMetadata =>
+ partitionMetadata.replicas.asScala.map { node =>
+ TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id)
+ }
+ }
+ }
+ debug(s"Selected topic partitions: $topicPartitionReplicas")
+ val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) =>
+ brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) }
+ }
+ debug(s"Topic partitions per broker: $brokerToTopicPartitions")
+ val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica =>
+ new TopicPartition(replica.topic, replica.partitionId)
+ }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size }
+ debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition")
+
+ val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData =>
+ topicMetaData.partitions.asScala.map { partitionMetadata =>
+ new TopicPartition(topicMetaData.name, partitionMetadata.partition)
}
- }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) =>
- topicAndPartition
- })
- debug("Leaders per broker: " + leadersPerBroker)
-
- val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,
- leadersPerBroker,
- topicAndPartitionsPerBroker.size,
- brokerMap,
- initialOffsetTime,
- reportInterval)
+ }
+
+ val consumerProps = consumerConfig(brokerList)
+
+ val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition,
+ initialOffsets(topicPartitions, consumerProps, initialOffsetTime),
+ brokerToTopicPartitions.size,
+ reportInterval)
// create all replica fetcher threads
- val verificationBrokerId = topicAndPartitionsPerBroker.head._1
- val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map {
- case (brokerId, topicAndPartitions) =>
- new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,
- sourceBroker = brokerMap(brokerId),
- topicAndPartitions = topicAndPartitions,
- replicaBuffer = replicaBuffer,
- socketTimeout = 30000,
- socketBufferSize = 256000,
- fetchSize = fetchSize,
- maxWait = maxWaitMs,
- minBytes = 1,
- doVerification = brokerId == verificationBrokerId)
+ val verificationBrokerId = brokerToTopicPartitions.head._1
+ val counter = new AtomicInteger(0)
+ val fetcherThreads: Iterable[ReplicaFetcher] = brokerToTopicPartitions.map { case (brokerId, topicPartitions) =>
+ new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId",
+ sourceBroker = brokerInfo(brokerId),
+ topicPartitions = topicPartitions,
+ replicaBuffer = replicaBuffer,
+ socketTimeout = 30000,
+ socketBufferSize = 256000,
+ fetchSize = fetchSize,
+ maxWait = maxWaitMs,
+ minBytes = 1,
+ doVerification = brokerId == verificationBrokerId,
+ consumerProps,
+ fetcherId = counter.incrementAndGet())
}
Runtime.getRuntime.addShutdownHook(new Thread() {
@@ -194,87 +202,112 @@ object ReplicaVerificationTool extends Logging {
println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")
}
+
+ private def listTopicsMetadata(adminClient: admin.AdminClient): Seq[TopicDescription] = {
+ val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get
+ adminClient.describeTopics(topics).all.get.values.asScala.toBuffer
+ }
+
+ private def brokerDetails(adminClient: admin.AdminClient): Map[Int, Node] = {
+ adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap
+ }
+
+ private def createAdminClient(brokerUrl: String): admin.AdminClient = {
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+ admin.AdminClient.create(props)
+ }
+
+ private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties,
+ initialOffsetTime: Long): Map[TopicPartition, Long] = {
+ val consumer = createConsumer(consumerConfig)
+ try {
+ if (ListOffsetRequest.LATEST_TIMESTAMP == initialOffsetTime)
+ consumer.endOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
+ else if (ListOffsetRequest.EARLIEST_TIMESTAMP == initialOffsetTime)
+ consumer.beginningOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap
+ else {
+ val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap
+ consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.mapValues(v => v.offset).toMap
+ }
+ } finally consumer.close()
+ }
+
+ private def consumerConfig(brokerUrl: String): Properties = {
+ val properties = new Properties()
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification")
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+ properties
+ }
+
+ private def createConsumer(consumerConfig: Properties): KafkaConsumer[String, String] =
+ new KafkaConsumer(consumerConfig)
}
-private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int)
+private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int)
private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)
-private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int],
- leadersPerBroker: Map[Int, Seq[TopicAndPartition]],
+private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartition, Int],
+ initialOffsets: Map[TopicPartition, Long],
expectedNumFetchers: Int,
- brokerMap: Map[Int, BrokerEndPoint],
- initialOffsetTime: Long,
reportInterval: Long) extends Logging {
- private val fetchOffsetMap = new Pool[TopicAndPartition, Long]
- private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]
+ private val fetchOffsetMap = new Pool[TopicPartition, Long]
+ private val recordsCache = new Pool[TopicPartition, Pool[Int, FetchResponse.PartitionData[MemoryRecords]]]
private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
@volatile private var lastReportTime = Time.SYSTEM.milliseconds
private var maxLag: Long = -1L
private var offsetWithMaxLag: Long = -1L
- private var maxLagTopicAndPartition: TopicAndPartition = null
+ private var maxLagTopicAndPartition: TopicPartition = null
initialize()
def createNewFetcherBarrier() {
fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))
}
- def getFetcherBarrier() = fetcherBarrier.get()
+ def getFetcherBarrier() = fetcherBarrier.get
def createNewVerificationBarrier() {
verificationBarrier.set(new CountDownLatch(1))
}
- def getVerificationBarrier() = verificationBarrier.get()
+ def getVerificationBarrier() = verificationBarrier.get
private def initialize() {
- for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet)
- messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData])
+ for (topicPartition <- expectedReplicasPerTopicPartition.keySet)
+ recordsCache.put(topicPartition, new Pool[Int, FetchResponse.PartitionData[MemoryRecords]])
setInitialOffsets()
}
- private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
- offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
- partitionOffsetsResponse.error != Errors.NONE
- }.mkString
- }
private def setInitialOffsets() {
- for ((brokerId, topicAndPartitions) <- leadersPerBroker) {
- val broker = brokerMap(brokerId)
- val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId)
- val initialOffsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] =
- topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap
- val offsetRequest = OffsetRequest(initialOffsetMap)
- val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
- assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse))
- offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) =>
- fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)
- }
- }
+ for ((tp, offset) <- initialOffsets)
+ fetchOffsetMap.put(tp, offset)
}
- def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) {
- messageSetCache.get(topicAndPartition).put(replicaId, partitionData)
+ def addFetchedData(topicAndPartition: TopicPartition, replicaId: Int, partitionData: FetchResponse.PartitionData[MemoryRecords]) {
+ recordsCache.get(topicAndPartition).put(replicaId, partitionData)
}
- def getOffset(topicAndPartition: TopicAndPartition) = {
+ def getOffset(topicAndPartition: TopicPartition) = {
fetchOffsetMap.get(topicAndPartition)
}
def verifyCheckSum(println: String => Unit) {
debug("Begin verification")
maxLag = -1L
- for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {
- debug("Verifying " + topicAndPartition)
- assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition),
- "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected "
- + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas")
+ for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
+ debug("Verifying " + topicPartition)
+ assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
+ "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected "
+ + expectedReplicasPerTopicPartition(topicPartition) + " replicas")
val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) =>
- replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.batches.iterator
+ replicaId -> fetchResponse.records.batches.iterator
}
- val maxHw = fetchResponsePerReplica.values.map(_.hw).max
+ val maxHw = fetchResponsePerReplica.values.map(_.highWatermark).max
// Iterate one message at a time from every replica, until high watermark is reached.
var isMessageInAllReplicas = true
@@ -286,7 +319,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
val batch = recordBatchIterator.next()
// only verify up to the high watermark
- if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).hw)
+ if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).highWatermark)
isMessageInAllReplicas = false
else {
messageInfoFromFirstReplicaOpt match {
@@ -295,7 +328,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum))
case Some(messageInfoFromFirstReplica) =>
if (messageInfoFromFirstReplica.offset != batch.lastOffset) {
- println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition
+ println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicPartition
+ ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
+ messageInfoFromFirstReplica.offset + " doesn't match replica "
+ replicaId + "'s offset " + batch.lastOffset)
@@ -303,7 +336,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
}
if (messageInfoFromFirstReplica.checksum != batch.checksum)
println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
- + topicAndPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
+ + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica "
+ messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
+ "; replica " + replicaId + "'s checksum " + batch.checksum)
}
@@ -313,20 +346,20 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
} catch {
case t: Throwable =>
throw new RuntimeException("Error in processing replica %d in partition %s at offset %d."
- .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t)
+ .format(replicaId, topicPartition, fetchOffsetMap.get(topicPartition)), t)
}
}
if (isMessageInAllReplicas) {
val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset
- fetchOffsetMap.put(topicAndPartition, nextOffset)
- debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " +
- nextOffset + " for " + topicAndPartition)
+ fetchOffsetMap.put(topicPartition, nextOffset)
+ debug(expectedReplicasPerTopicPartition(topicPartition) + " replicas match at offset " +
+ nextOffset + " for " + topicPartition)
}
}
- if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) {
- offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition)
+ if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) {
+ offsetWithMaxLag = fetchOffsetMap.get(topicPartition)
maxLag = maxHw - offsetWithMaxLag
- maxLagTopicAndPartition = topicAndPartition
+ maxLagTopicAndPartition = topicPartition
}
fetchResponsePerReplica.clear()
}
@@ -334,51 +367,54 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
if (currentTimeMs - lastReportTime > reportInterval) {
println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
+ maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
- + " among " + messageSetCache.size + " partitions")
+ + " among " + recordsCache.size + " partitions")
lastReportTime = currentTimeMs
}
}
}
-private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAndPartitions: Iterable[TopicAndPartition],
+private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: Iterable[TopicPartition],
replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,
- fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean)
+ fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties,
+ fetcherId: Int)
extends ShutdownableThread(name) {
- val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId)
- val fetchRequestBuilder = new FetchRequestBuilder().
- clientId(ReplicaVerificationTool.clientId).
- replicaId(Request.DebuggingConsumerId).
- maxWait(maxWait).
- minBytes(minBytes)
+
+ private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId,
+ s"broker-${Request.DebuggingConsumerId}-fetcher-$fetcherId")
override def doWork() {
val fetcherBarrier = replicaBuffer.getFetcherBarrier()
val verificationBarrier = replicaBuffer.getVerificationBarrier()
- for (topicAndPartition <- topicAndPartitions)
- fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
- replicaBuffer.getOffset(topicAndPartition), fetchSize)
+ val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
+ for (topicPartition <- topicPartitions)
+ requestMap.put(topicPartition, new JFetchRequest.PartitionData(replicaBuffer.getOffset(topicPartition), 0L, fetchSize))
+
+ val fetchRequestBuilder = JFetchRequest.Builder.
+ forReplica(ApiKeys.FETCH.latestVersion, Request.DebuggingConsumerId, maxWait, minBytes, requestMap)
- val fetchRequest = fetchRequestBuilder.build()
- debug("Issuing fetch request " + fetchRequest)
+ debug("Issuing fetch request ")
- var response: FetchResponse = null
+ var fetchResponse: FetchResponse[MemoryRecords] = null
try {
- response = simpleConsumer.fetch(fetchRequest)
+ val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder)
+ fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[MemoryRecords]]
} catch {
case t: Throwable =>
if (!isRunning)
throw t
}
- if (response != null) {
- response.data.foreach { case (topicAndPartition, partitionData) =>
- replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData)
+ if (fetchResponse != null) {
+ fetchResponse.responseData.asScala.foreach { case (tp, partitionData) =>
+ replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData)
}
} else {
- for (topicAndPartition <- topicAndPartitions)
- replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty))
+ val emptyResponse = new FetchResponse.PartitionData(Errors.NONE, FetchResponse.INVALID_HIGHWATERMARK,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ for (topicAndPartition <- topicPartitions)
+ replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, emptyResponse)
}
fetcherBarrier.countDown()
@@ -402,3 +438,64 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
debug("Done verification")
}
}
+
+private class ReplicaFetcherBlockingSend(sourceNode: Node,
+ consumerConfig: ConsumerConfig,
+ metrics: Metrics,
+ time: Time,
+ fetcherId: Int,
+ clientId: String) {
+
+ private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
+
+ private val networkClient = {
+ val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig)
+ val selector = new Selector(
+ NetworkReceive.UNLIMITED,
+ consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+ metrics,
+ time,
+ "replica-fetcher",
+ Map("broker-id" -> sourceNode.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
+ false,
+ channelBuilder,
+ new LogContext
+ )
+ new NetworkClient(
+ selector,
+ new ManualMetadataUpdater(),
+ clientId,
+ 1,
+ 0,
+ 0,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
+ consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+ time,
+ false,
+ new ApiVersions,
+ new LogContext
+ )
+ }
+
+ def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+ try {
+ if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+ throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
+ else {
+ val clientRequest = networkClient.newClientRequest(sourceNode.id.toString, requestBuilder,
+ time.milliseconds(), true)
+ NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
+ }
+ }
+ catch {
+ case e: Throwable =>
+ networkClient.close(sourceNode.id.toString)
+ throw e
+ }
+ }
+
+ def close(): Unit = {
+ networkClient.close()
+ }
+}
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index 211413a..f69c909 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -17,11 +17,10 @@
package kafka.tools
-import kafka.api.FetchResponsePartitionData
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.FetchResponse
import org.junit.Test
import org.junit.Assert.assertTrue
@@ -32,12 +31,12 @@ class ReplicaVerificationToolTest {
val sb = new StringBuilder
val expectedReplicasPerTopicAndPartition = Map(
- TopicAndPartition("a", 0) -> 3,
- TopicAndPartition("a", 1) -> 3,
- TopicAndPartition("b", 0) -> 2
+ new TopicPartition("a", 0) -> 3,
+ new TopicPartition("a", 1) -> 3,
+ new TopicPartition("b", 0) -> 2
)
- val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0)
+ val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, 0)
expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) =>
(0 until numReplicas).foreach { replicaId =>
val records = (0 to 5).map { index =>
@@ -45,8 +44,9 @@ class ReplicaVerificationToolTest {
}
val initialOffset = 4
val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*)
- replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20,
- new ByteBufferMessageSet(memoryRecords.buffer)))
+ val partitionData = new FetchResponse.PartitionData(Errors.NONE, 20, 20, 0L, null, memoryRecords)
+
+ replicaBuffer.addFetchedData(tp, replicaId, partitionData)
}
}
@@ -55,7 +55,7 @@ class ReplicaVerificationToolTest {
// If you change this assertion, you should verify that the replica_verification_test.py system test still passes
assertTrue(s"Max lag information should be in output: `$output`",
- output.endsWith(": max lag is 10 for partition a-0 at offset 10 among 3 partitions"))
+ output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions"))
}
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.