You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 20:07:19 UTC
kafka git commit: KAFKA-5033;
Set default retries for the idempotent producer to be infinite
Repository: kafka
Updated Branches:
refs/heads/trunk 34e379f10 -> c1fdf575d
KAFKA-5033; Set default retries for the idempotent producer to be infinite
Author: Apurva Mehta <ap...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #3091 from apurvam/KAFKA-5033-bump-retries-for-idempotent-producer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1fdf575
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1fdf575
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1fdf575
Branch: refs/heads/trunk
Commit: c1fdf575deed0398e1692be16cacf24308afd5d5
Parents: 34e379f
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu May 18 13:06:56 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu May 18 13:07:06 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/producer/KafkaProducer.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c1fdf575/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index aeef92f..71fb077 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -374,8 +374,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
userConfiguredRetries = true;
}
if (idempotenceEnabled && !userConfiguredRetries) {
- log.info("Overriding the default retries config to " + 3 + " since the idempotent producer is enabled.");
- return 3;
+ // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make
+ // this the default.
+ log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
+ "producer is enabled.", Integer.MAX_VALUE);
+ return Integer.MAX_VALUE;
}
if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
@@ -389,7 +392,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
userConfiguredInflights = true;
}
if (idempotenceEnabled && !userConfiguredInflights) {
- log.info("Overriding the default " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 since idempontence is enabled.");
+ log.info("Overriding the default {} to 1 since idempontence is enabled.", ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
return 1;
}
if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
@@ -407,13 +410,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
if (idempotenceEnabled && !userConfiguredAcks) {
- log.info("Overriding the default " + ProducerConfig.ACKS_CONFIG + " to all since idempotence is enabled");
+ log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
return -1;
}
if (idempotenceEnabled && acks != -1) {
throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
- "producer. Otherwise we cannot guarantee idempotence");
+ "producer. Otherwise we cannot guarantee idempotence.");
}
return acks;
}