You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/04/13 04:25:10 UTC

[kafka] branch trunk updated: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1df232c839 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
1df232c839 is described below

commit 1df232c839f4568718a52c04aad72b69beb52026
Author: RivenSun <91...@users.noreply.github.com>
AuthorDate: Wed Apr 13 12:24:57 2022 +0800

    MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
    
    Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/clients/producer/ProducerConfig.java     |  4 +++-
 .../java/org/apache/kafka/common/record/CompressionType.java  |  6 ++++++
 .../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++++++++++
 core/src/main/scala/kafka/server/KafkaConfig.scala            |  6 +++---
 core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala   |  2 +-
 5 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index afc1e55cdf..8fec07a297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig {
                                         in("all", "-1", "0", "1"),
                                         Importance.LOW,
                                         ACKS_DOC)
-                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 1b9754ffab..c526929b72 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -190,4 +190,10 @@ public enum CompressionType {
         else
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index a2f318bebc..ae9de7b70a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.config.ConfigException;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ProducerConfigTest {
 
@@ -59,4 +61,13 @@ public class ProducerConfigTest {
         assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
         assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
     }
+
+    @Test
+    public void testInvalidCompressionType() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
+        assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7dd9276d6d..6fe0acaa9d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -25,7 +25,7 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
 import kafka.log.LogConfig
 import kafka.log.LogConfig.MessageFormatVersion
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
 import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
@@ -227,7 +227,7 @@ object Defaults {
 
   val DeleteTopicEnable = true
 
-  val CompressionType = "producer"
+  val CompressionType = ProducerCompressionCodec.name
 
   val MaxIdMapSnapshots = 2
   /** ********* Kafka Metrics Configuration ***********/
@@ -1257,7 +1257,7 @@ object KafkaConfig {
       .define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc)
       .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
       .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc)
-      .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
+      .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), HIGH, CompressionTypeDoc)
 
       /** ********* Transaction management configuration ***********/
       .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, TransactionalIdExpirationMsDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a6597d8815..ed31dba41a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -657,7 +657,7 @@ class KafkaConfigTest {
   def testInvalidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     props.put(KafkaConfig.CompressionTypeProp, "abc")
-    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test