You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/08 17:00:08 UTC

kafka git commit: KAFKA-3353; Remove deprecated producer configs

Repository: kafka
Updated Branches:
  refs/heads/trunk 2554a8dd4 -> 3bcadbfb4


KAFKA-3353; Remove deprecated producer configs

These configs have been deprecated since 0.9.0.0:
block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2987 from ijuma/kafka-3353-remove-deprecated-producer-configs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bcadbfb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bcadbfb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bcadbfb

Branch: refs/heads/trunk
Commit: 3bcadbfb474f6caccc939fb3775a6f969d136af7
Parents: 2554a8d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 8 10:00:04 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 8 10:00:04 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 45 +-------------------
 .../kafka/clients/producer/ProducerConfig.java  | 43 -------------------
 docs/upgrade.html                               |  2 +
 3 files changed, 4 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 286387b..f812389 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
 
-            this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
-            this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
+            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.transactionManager = configureTransactionState(config);
             int retries = configureRetries(config, transactionManager != null);
             int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
@@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
     }
 
-    private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
-        /* check for user defined settings.
-         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
-         * This should be removed with release 0.9 when the deprecated configs are removed.
-         */
-        if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
-            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
-                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
-            if (blockOnBufferFull) {
-                return Long.MAX_VALUE;
-            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-            } else {
-                return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            }
-        } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-        } else {
-            return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-        }
-    }
-
-    private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
-        /* check for user defined settings.
-         * If the TIME_OUT config is set use that for request timeout.
-         * This should be removed with release 0.9
-         */
-        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
-            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
-                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
-        } else {
-            return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        }
-    }
-
     private static TransactionManager configureTransactionState(ProducerConfig config) {
 
         TransactionManager transactionManager = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4bceb95..12e8c64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
-    /** <code>metadata.fetch.timeout.ms</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
-     */
-    @Deprecated
-    public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
-    private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers "
-                                                             + "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch "
-                                                             + "to succeed before throwing an exception back to the client.";
-
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig {
                                            + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                            + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
 
-    /** <code>timeout.ms</code> */
-
-    /**
-     * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
-     */
-    @Deprecated
-    public static final String TIMEOUT_CONFIG = "timeout.ms";
-    private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to "
-                                              + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
-                                              + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
-                                              + "is measured on the server side and does not include the network latency of the request.";
-
     /** <code>linger.ms</code> */
     public static final String LINGER_MS_CONFIG = "linger.ms";
     private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
@@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig {
                                                     + "These methods can be blocked either because the buffer is full or metadata unavailable."
                                                     + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
 
-    /** <code>block.on.buffer.full</code> */
-    /**
-     * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}.
-     */
-    @Deprecated
-    public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
-    private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. "
-                                                           + "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> "
-                                                           + "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. "
-                                                           + "<em>Also if this property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + "</code> is no longer honored.</em>"
-                                                           + "<p>This parameter is deprecated and will be removed in a future release. "
-                                                           + "Parameter <code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead.";
-
     /** <code>buffer.memory</code> */
     public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
     private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
@@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
-                                .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
@@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
-                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-                                .define(METADATA_FETCH_TIMEOUT_CONFIG,
-                                        Type.LONG,
-                                        60 * 1000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        METADATA_FETCH_TIMEOUT_DOC)
                                 .define(MAX_BLOCK_MS_CONFIG,
                                         Type.LONG,
                                         60 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 227c728..116d2ff 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,8 @@
 <ul>
     <li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
         to retain the previous behavior should set the broker config <code>unclean.leader.election.enabled</code> to <code>false</code>.</li>
+    <li>Producer configs <code>block.on.buffer.full</code>, <code>metadata.fetch.timeout.ms</code> and <code>timeout.ms</code> have been
+        removed. They were initially deprecated in Kafka 0.9.0.0.</li>
     <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
         auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
         replication factor requirement.</li>