You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/16 07:51:11 UTC
[kafka] branch trunk updated: KAFKA-13851: Add integration tests for DeleteRecords API (#12087)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fae84e4d13 KAFKA-13851: Add integration tests for DeleteRecords API (#12087)
5fae84e4d13 is described below
commit 5fae84e4d138fd11d07cdf419c9fa2b3164570ea
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Mon May 16 10:50:50 2022 +0300
KAFKA-13851: Add integration tests for DeleteRecords API (#12087)
Reviewers: Luke Chen <sh...@gmail.com>, dengziming <de...@gmail.com>
---
.../kafka/server/DeleteRecordsRequestTest.scala | 170 +++++++++++++++++++++
1 file changed, 170 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala
new file mode 100644
index 00000000000..d43c5c7dfa4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestInfoUtils
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.DeleteRecordsRequestData
+import org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition, DeleteRecordsTopic}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.collection.Seq
+
+class DeleteRecordsRequestTest extends BaseRequestTest {
+ private val TIMEOUT_MS = 1000
+ private val MESSAGES_PRODUCED_PER_PARTITION = 10
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteRecordsHappyCase(quorum: String): Unit = {
+ val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords
+
+ // Create the DeleteRecord request requesting deletion of offset which is not present
+ val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0)
+ val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+ // call the API
+ val response = sendDeleteRecordsRequest(request, leaderId)
+ val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+ // Validate the expected error code in the response
+ assertEquals(Errors.NONE.code(), partitionResult.errorCode(),
+ s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode).name()}")
+
+ // Validate the expected lowWaterMark in the response
+ assertEquals(offsetToDelete, partitionResult.lowWatermark(),
+ s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}")
+
+ // Validate that the records have actually deleted
+ validateLogStartOffsetForTopic(topicPartition, offsetToDelete)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = {
+ val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords
+
+ // Create the DeleteRecord request requesting deletion of offset which is not present
+ val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5
+ val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+ // call the API
+ val response = sendDeleteRecordsRequest(request, leaderId)
+ val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+ // Validate the expected error code in the response
+ assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), partitionResult.errorCode(),
+ s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode()).name()}")
+
+ // Validate the expected value for low watermark
+ assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, partitionResult.lowWatermark())
+
+ // After error, the offset of the topic should have been the original i.e. delete record should not have deleted
+ // records.
+ validateLogStartOffsetForTopic(topicPartition, 0)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = {
+ val invalidTopicPartition = new TopicPartition("invalid-topic", 0)
+ // Create the DeleteRecord request requesting deletion of offset which is not present
+ val offsetToDelete = 1
+ val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(invalidTopicPartition, offsetToDelete)
+
+ // call the API
+ val response = sendDeleteRecordsRequest(request)
+ val partitionResult = response.data.topics.find(invalidTopicPartition.topic).partitions.find(invalidTopicPartition.partition)
+
+ // Validate the expected error code in the response
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), partitionResult.errorCode(),
+ s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode()).name()}")
+
+ // Validate the expected value for low watermark
+ assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, partitionResult.lowWatermark())
+ }
+
+ private def createTopicAndSendRecords = {
+ // Single topic
+ val topic1 = "topic-1"
+ val topicPartition = new TopicPartition(topic1, 0)
+ val partitionToLeader = createTopic(topic1)
+ assertTrue(partitionToLeader.contains(topicPartition.partition), "Topic creation did not succeed.")
+ // Write records
+ produceData(Seq(topicPartition), MESSAGES_PRODUCED_PER_PARTITION)
+ (topicPartition, partitionToLeader(topicPartition.partition))
+ }
+
+ private def createDeleteRecordsRequestForTopicPartition(topicPartition: TopicPartition, offsetToDelete: Int) = {
+ val requestData = new DeleteRecordsRequestData()
+ .setTopics(Collections.singletonList(new DeleteRecordsTopic()
+ .setName(topicPartition.topic())
+ .setPartitions(Collections.singletonList(new DeleteRecordsPartition()
+ .setOffset(offsetToDelete)
+ .setPartitionIndex(topicPartition.partition())))))
+ .setTimeoutMs(TIMEOUT_MS)
+ val request = new DeleteRecordsRequest.Builder(requestData).build()
+ request
+ }
+
+ private def sendDeleteRecordsRequest(request: DeleteRecordsRequest): DeleteRecordsResponse = {
+ connectAndReceive[DeleteRecordsResponse](request, destination = anySocketServer)
+ }
+
+ private def sendDeleteRecordsRequest(request: DeleteRecordsRequest, leaderId: Int): DeleteRecordsResponse = {
+ connectAndReceive[DeleteRecordsResponse](request, destination = brokerSocketServer(leaderId))
+ }
+
+ private def produceData(topicPartitions: Iterable[TopicPartition], numMessagesPerPartition: Int): Seq[RecordMetadata] = {
+ val producer = createProducer(keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+ val records = for {
+ tp <- topicPartitions.toSeq
+ messageIndex <- 0 until numMessagesPerPartition
+ } yield {
+ val suffix = s"$tp-$messageIndex"
+ new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix")
+ }
+
+ val sendfutureList = records.map(producer.send)
+
+ // ensure that records are flushed to server
+ producer.flush()
+
+ val recordMetadataList = sendfutureList.map(_.get(10, TimeUnit.SECONDS))
+ recordMetadataList
+ .foreach(recordMetadata => assertTrue(recordMetadata.offset >= 0, s"Invalid offset $recordMetadata"))
+
+ recordMetadataList
+ }
+
+ private def validateLogStartOffsetForTopic(topicPartition: TopicPartition, expectedStartOffset: Long): Unit = {
+ val logForTopicPartition = brokers.flatMap(_.replicaManager.logManager.getLog(topicPartition)).headOption
+ // logManager should exist for the provided partition
+ assertTrue(logForTopicPartition.isDefined)
+ // assert that log start offset is equal to the expectedStartOffset after DeleteRecords has been called.
+ assertEquals(expectedStartOffset, logForTopicPartition.get.logStartOffset)
+ }
+}