You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/04/13 04:25:10 UTC
[kafka] branch trunk updated: MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1df232c839 MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
1df232c839 is described below
commit 1df232c839f4568718a52c04aad72b69beb52026
Author: RivenSun <91...@users.noreply.github.com>
AuthorDate: Wed Apr 13 12:24:57 2022 +0800
MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (#11985)
Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.
Reviewers: Mickael Maison <mi...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/clients/producer/ProducerConfig.java | 4 +++-
.../java/org/apache/kafka/common/record/CompressionType.java | 6 ++++++
.../org/apache/kafka/clients/producer/ProducerConfigTest.java | 11 +++++++++++
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++---
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala | 2 +-
5 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index afc1e55cdf..8fec07a297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -26,7 +26,9 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -329,7 +331,7 @@ public class ProducerConfig extends AbstractConfig {
in("all", "-1", "0", "1"),
Importance.LOW,
ACKS_DOC)
- .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+ .define(COMPRESSION_TYPE_CONFIG, Type.STRING, CompressionType.NONE.name, in(Utils.enumOptions(CompressionType.class)), Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 1b9754ffab..c526929b72 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -190,4 +190,10 @@ public enum CompressionType {
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index a2f318bebc..ae9de7b70a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class ProducerConfigTest {
@@ -59,4 +61,13 @@ public class ProducerConfigTest {
assertEquals(newConfigs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), keySerializerClass);
assertEquals(newConfigs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), valueSerializerClass);
}
+
+ @Test
+ public void testInvalidCompressionType() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "abc");
+ assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7dd9276d6d..6fe0acaa9d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -25,7 +25,7 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.log.LogConfig
import kafka.log.LogConfig.MessageFormatVersion
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
@@ -227,7 +227,7 @@ object Defaults {
val DeleteTopicEnable = true
- val CompressionType = "producer"
+ val CompressionType = ProducerCompressionCodec.name
val MaxIdMapSnapshots = 2
/** ********* Kafka Metrics Configuration ***********/
@@ -1257,7 +1257,7 @@ object KafkaConfig {
.define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc)
.define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
.define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc)
- .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc)
+ .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), HIGH, CompressionTypeDoc)
/** ********* Transaction management configuration ***********/
.define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, TransactionalIdExpirationMsDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a6597d8815..ed31dba41a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -657,7 +657,7 @@ class KafkaConfigTest {
def testInvalidCompressionType(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.CompressionTypeProp, "abc")
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
+ assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test