You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/09/27 11:02:16 UTC

[kafka] branch 3.6 updated: KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 4fdac6136b6 KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
4fdac6136b6 is described below

commit 4fdac6136b63264808e08970a35796272ac3abf4
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Wed Sep 27 19:00:50 2023 +0800

    KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
    
    bump snappy-java version to 1.1.10.4, and add more tests to verify the compressed data can be correctly decompressed and read.
    
    For LogCleanerParameterizedIntegrationTest, we increased the message size for snappy decompression since in the new version of snappy, the decompressed size is increasing compared with the previous version. But since the compression algorithm is not kafka's scope, all we need to do is to make sure the compressed data can be successfully decompressed and parsed/read.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Ismael Juma <is...@juma.me.uk>, Josep Prat <jo...@aiven.io>, Kamal Chandraprakash <ka...@gmail.com>
---
 LICENSE-binary                                     |  2 +-
 .../kafka/api/ProducerCompressionTest.scala        | 63 ++++++++++++++++++----
 .../LogCleanerParameterizedIntegrationTest.scala   |  4 +-
 gradle/dependencies.gradle                         |  2 +-
 4 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index f031ebecf02..5a0642b4d94 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -261,7 +261,7 @@ scala-library-2.13.11
 scala-logging_2.13-3.9.4
 scala-reflect-2.13.11
 scala-java8-compat_2.13-1.0.2
-snappy-java-1.1.10.3
+snappy-java-1.1.10.4
 swagger-annotations-2.2.8
 zookeeper-3.8.2
 zookeeper-jute-3.8.2
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b1e39ebde49..6135ec952ca 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,8 +19,10 @@ package kafka.api.test
 
 import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
@@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
+import java.util.concurrent.Future
 import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
 
 class ProducerCompressionTest extends QuorumTestHarness {
 
@@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness {
     "kraft,snappy",
     "kraft,lz4",
     "kraft,zstd",
-    "zk,gzip"
+    "zk,gzip",
+    "zk,snappy"
   ))
   def testCompression(quorum: String, compression: String): Unit = {
-
     val producerProps = new Properties()
     val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
       }
       val partition = 0
 
+      def messageValue(length: Int): String = {
+        val random = new Random(0)
+        new String(random.alphanumeric.take(length).toArray)
+      }
+
       // prepare the messages
-      val messageValues = (0 until numRecords).map(i => "value" + i)
+      val messageValues = (0 until numRecords).map(i => messageValue(i))
+      val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+      val headers = new RecordHeaders(headerArr)
 
       // make sure the returned messages are correct
       val now = System.currentTimeMillis()
-      val responses = for (message <- messageValues)
-        yield producer.send(new ProducerRecord(topic, null, now, null, message.getBytes))
+      val responses: ListBuffer[Future[RecordMetadata]] = new ListBuffer[Future[RecordMetadata]]()
+
+      for (message <- messageValues) {
+        // 1. send message without key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, null, message.getBytes))
+        // 2. send message with key, without header
+        responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes))
+        // 3. send message with key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes, headers))
+      }
       for ((future, offset) <- responses.zipWithIndex) {
         assertEquals(offset.toLong, future.get.offset)
       }
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records(offset)
+        assertEquals(messageValue.length.toString, new String(record.key()))
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
 
-      for (((messageValue, record), index) <- messageValues.zip(records).zipWithIndex) {
+        // 3. verify message with key and header
+        offset = i * 3 + 2
+        record = records(offset)
+        assertEquals(messageValue.length.toString, new String(record.key()))
         assertEquals(messageValue, new String(record.value))
+        assertEquals(1, record.headers().toArray.length)
+        assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
         assertEquals(now, record.timestamp)
-        assertEquals(index.toLong, record.offset)
+        assertEquals(offset.toLong, record.offset)
       }
     } finally {
       producer.close()
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 5ab53699cd4..132a77ff97b 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -144,9 +144,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
       case _ =>
         // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
         // increase because the broker offsets are larger than the ones assigned by the client
-        // adding `5` to the message set size is good enough for this test: it covers the increased message size while
+        // adding `6` to the message set size is good enough for this test: it covers the increased message size while
         // still being less than the overhead introduced by the conversion from message format version 0 to 1
-        largeMessageSet.sizeInBytes + 5
+        largeMessageSet.sizeInBytes + 6
     }
 
     cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 92199c036db..6994f994e43 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -142,7 +142,7 @@ versions += [
   scalaJava8Compat : "1.0.2",
   scoverage: "1.9.3",
   slf4j: "1.7.36",
-  snappy: "1.1.10.3",
+  snappy: "1.1.10.4",
   spotbugs: "4.7.3",
   // New version of Swagger 2.2.14 requires minimum JDK 11.
   swaggerAnnotations: "2.2.8",