You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/08 17:00:08 UTC
kafka git commit: KAFKA-3353; Remove deprecated producer configs
Repository: kafka
Updated Branches:
refs/heads/trunk 2554a8dd4 -> 3bcadbfb4
KAFKA-3353; Remove deprecated producer configs
These configs have been deprecated since 0.9.0.0:
block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2987 from ijuma/kafka-3353-remove-deprecated-producer-configs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bcadbfb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bcadbfb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bcadbfb
Branch: refs/heads/trunk
Commit: 3bcadbfb474f6caccc939fb3775a6f969d136af7
Parents: 2554a8d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon May 8 10:00:04 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 8 10:00:04 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 45 +-------------------
.../kafka/clients/producer/ProducerConfig.java | 43 -------------------
docs/upgrade.html | 2 +
3 files changed, 4 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 286387b..f812389 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
- this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
- this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
+ this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+ this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config);
int retries = configureRetries(config, transactionManager != null);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
@@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
}
- private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
- /* check for user defined settings.
- * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
- * This should be removed with release 0.9 when the deprecated configs are removed.
- */
- if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
- log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
- "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
- boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
- if (blockOnBufferFull) {
- return Long.MAX_VALUE;
- } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
- log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
- "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
- return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- } else {
- return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
- }
- } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
- log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
- "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
- return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- } else {
- return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
- }
- }
-
- private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
- /* check for user defined settings.
- * If the TIME_OUT config is set use that for request timeout.
- * This should be removed with release 0.9
- */
- if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
- log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
- ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
- } else {
- return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- }
- }
-
private static TransactionManager configureTransactionState(ProducerConfig config) {
TransactionManager transactionManager = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4bceb95..12e8c64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig {
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
- /** <code>metadata.fetch.timeout.ms</code> */
- /**
- * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
- */
- @Deprecated
- public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
- private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers "
- + "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch "
- + "to succeed before throwing an exception back to the client.";
-
/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig {
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
- /** <code>timeout.ms</code> */
-
- /**
- * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
- */
- @Deprecated
- public static final String TIMEOUT_CONFIG = "timeout.ms";
- private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to "
- + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
- + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
- + "is measured on the server side and does not include the network latency of the request.";
-
/** <code>linger.ms</code> */
public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
@@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig {
+ "These methods can be blocked either because the buffer is full or metadata unavailable."
+ "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
- /** <code>block.on.buffer.full</code> */
- /**
- * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}.
- */
- @Deprecated
- public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
- private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. "
- + "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> "
- + "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. "
- + "<em>Also if this property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + "</code> is no longer honored.</em>"
- + "<p>This parameter is deprecated and will be removed in a future release. "
- + "Parameter <code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead.";
-
/** <code>buffer.memory</code> */
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
@@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
- .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
@@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
- .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
- .define(METADATA_FETCH_TIMEOUT_CONFIG,
- Type.LONG,
- 60 * 1000,
- atLeast(0),
- Importance.LOW,
- METADATA_FETCH_TIMEOUT_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
60 * 1000,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3bcadbfb/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 227c728..116d2ff 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,8 @@
<ul>
<li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
to retain the previous behavior should set the broker config <code>unclean.leader.election.enabled</code> to <code>false</code>.</li>
+ <li>Producer configs <code>block.on.buffer.full</code>, <code>metadata.fetch.timeout.ms</code> and <code>timeout.ms</code> have been
+ removed. They were initially deprecated in Kafka 0.9.0.0.</li>
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.</li>