You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/03/03 13:55:44 UTC

[kafka] branch 3.0 updated: KAFKA-13673: disable idempotence when config conflicts (#11788)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new af1f68a  KAFKA-13673: disable idempotence when config conflicts (#11788)
af1f68a is described below

commit af1f68ab487a0e9cb17cd2112b139b85b8a69d7c
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Mar 3 21:40:41 2022 +0800

    KAFKA-13673: disable idempotence when config conflicts (#11788)
    
    Disable idempotence when conflicting config values for acks, retries
    and max.in.flight.requests.per.connection are set by the user. For the
    former two configs, we log at info level when we disable idempotence
    due to conflicting configs. For the latter, we log at warn level since
    it's due to an implementation detail that is likely to be surprising.
    
    This mitigates compatibility impact of enabling idempotence by default.
    
    Added unit tests to verify the change in behavior.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Mickael Maison <mi...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      |   2 +-
 .../kafka/clients/producer/ProducerConfig.java     |  97 ++++++++++++-------
 .../kafka/clients/producer/KafkaProducerTest.java  | 106 +++++++++++++--------
 docs/upgrade.html                                  |   3 +-
 4 files changed, 134 insertions(+), 74 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2d53b18..dc4f415 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -514,7 +514,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                                          LogContext logContext) {
         TransactionManager transactionManager = null;
 
-        if (config.idempotenceEnabled()) {
+        if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
             final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
             final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
             final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 57d3a8b..e3d09f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +46,7 @@ import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
  * href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a>
  */
 public class ProducerConfig extends AbstractConfig {
+    private static final Logger log = LoggerFactory.getLogger(ProducerConfig.class);
 
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
@@ -99,7 +102,10 @@ public class ProducerConfig extends AbstractConfig {
                                            + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
                                            + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
                                            + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
-                                           + "</ul>";
+                                           + "</ul>"
+                                           + "<p>"
+                                           + "Note that enabling idempotence requires this config value to be 'all'."
+                                           + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
 
     /** <code>linger.ms</code> */
     public static final String LINGER_MS_CONFIG = "linger.ms";
@@ -197,25 +203,32 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>metric.reporters</code> */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
+    // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+
     /** <code>max.in.flight.requests.per.connection</code> */
     public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
     private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                             + " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
-                                                                            + " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
-    // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
-    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
+                                                                            + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)."
+                                                                            + " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
+                                                                            + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.";
 
     /** <code>retries</code> */
     public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
     private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
             + " Note that this retry is no different than if the client resent the record upon receiving the error."
-            + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
-            + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
-            + " succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be"
-            + " failed before the number of retries has been exhausted if the timeout configured by"
+            + " Produce requests will be failed before the number of retries has been exhausted if the timeout configured by"
             + " <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> expires first before successful acknowledgement. Users should generally"
             + " prefer to leave this config unset and instead use <code>" + DELIVERY_TIMEOUT_MS_CONFIG + "</code> to control"
-            + " retry behavior.";
+            + " retry behavior."
+            + "<p>"
+            + "Enabling idempotence requires this config value to be greater than 0."
+            + " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."
+            + "<p>"
+            + "Allowing retries while setting <code>enable.idempotence</code> to <code>false</code> and <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+            + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
+            + " succeeds, then the records in the second batch may appear first.";
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
@@ -250,8 +263,11 @@ public class ProducerConfig extends AbstractConfig {
                                                         + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
                                                         + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
                                                         + " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
-                                                        + ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
-                                                        + "values are set, a <code>ConfigException</code> will be thrown.";
+                                                        + ACKS_CONFIG + "</code> must be 'all'. "
+                                                        + "<p>"
+                                                        + "Idempotence is enabled by default if no conflicting configurations are set. "
+                                                        + "If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. "
+                                                        + "If idempotence is explicitly enabled and conflicting configurations are set, a <code>ConfigException</code> is thrown.";
 
     /** <code> transaction.timeout.ms </code> */
     public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
@@ -438,27 +454,53 @@ public class ProducerConfig extends AbstractConfig {
         final Map<String, Object> originalConfigs = this.originals();
         final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
         configs.put(ACKS_CONFIG, acksStr);
-
-        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
-        if (idempotenceEnabled()) {
-            boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
-            if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
-                throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+        final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean shouldDisableIdempotence = false;
+
+        // For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation
+        if (idempotenceEnabled) {
+            final int retries = this.getInt(RETRIES_CONFIG);
+            if (retries == 0) {
+                if (userConfiguredIdempotence) {
+                    throw new ConfigException("Must set " + RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+                }
+                log.info("Idempotence will be disabled because {} is set to 0.", RETRIES_CONFIG, retries);
+                shouldDisableIdempotence = true;
             }
 
-            boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
             final short acks = Short.valueOf(acksStr);
-            if (userConfiguredAcks && acks != (short) -1) {
-                throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
+            if (acks != (short) -1) {
+                if (userConfiguredIdempotence) {
+                    throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
                         "producer. Otherwise we cannot guarantee idempotence.");
+                }
+                log.info("Idempotence will be disabled because {} is set to {}, not set to 'all'.", ACKS_CONFIG, acks);
+                shouldDisableIdempotence = true;
             }
 
-            boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-            if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-                throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+            final int inFlightConnection = this.getInt(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+            if (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < inFlightConnection) {
+                if (userConfiguredIdempotence) {
+                    throw new ConfigException("Must set " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
                         " to use the idempotent producer.");
+                }
+                log.warn("Idempotence will be disabled because {} is set to {}, which is greater than 5. " +
+                    "Please note that in v4.0.0 and onward, this will become an error.", MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, inFlightConnection);
+                shouldDisableIdempotence = true;
             }
         }
+
+        if (shouldDisableIdempotence) {
+            configs.put(ENABLE_IDEMPOTENCE_CONFIG, false);
+            idempotenceEnabled = false;
+        }
+
+        // validate `transaction.id` after validating idempotence dependant configs because `enable.idempotence` config might be overridden
+        boolean userConfiguredTransactions = originalConfigs.containsKey(TRANSACTIONAL_ID_CONFIG);
+        if (!idempotenceEnabled && userConfiguredTransactions) {
+            throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+        }
     }
 
     private static String parseAcks(String acksString) {
@@ -488,15 +530,6 @@ public class ProducerConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
-    boolean idempotenceEnabled() {
-        boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-        boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
-        if (!idempotenceEnabled && userConfiguredTransactions)
-            throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-
-        return idempotenceEnabled;
-    }
-
     ProducerConfig(Map<?, ?> props, boolean doLog) {
         super(CONFIG, props, doLog);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index dbc966d..d707bcb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -227,6 +227,32 @@ public class KafkaProducerTest {
             config.getString(ProducerConfig.ACKS_CONFIG),
             "acks should be overwritten");
 
+        Properties validProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+            }};
+        config = new ProducerConfig(validProps4);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
+        assertEquals(
+            "0",
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "acks should be set with overridden value");
+
+        Properties validProps5 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "1");
+            }};
+        config = new ProducerConfig(validProps5);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be disabled when acks not set to all and `enable.idempotence` config is unset.");
+        assertEquals(
+            "1",
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "acks should be set with overridden value");
+
         Properties invalidProps = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "0");
@@ -252,21 +278,12 @@ public class KafkaProducerTest {
         Properties invalidProps3 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "0");
-            }};
-        assertThrows(
-            ConfigException.class,
-            () -> new ProducerConfig(invalidProps3),
-            "Must set acks to all in order to use the idempotent producer");
-
-        Properties invalidProps4 = new Properties() {{
-                putAll(baseProps);
-                setProperty(ProducerConfig.ACKS_CONFIG, "0");
                 setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
             }};
         assertThrows(
             ConfigException.class,
-            () -> new ProducerConfig(invalidProps4),
-            "Must set retries to non-zero when using the idempotent producer.");
+            () -> new ProducerConfig(invalidProps3),
+            "Must set acks to all when using the transactional producer.");
     }
 
     @Test
@@ -291,6 +308,19 @@ public class KafkaProducerTest {
             config.getInt(ProducerConfig.RETRIES_CONFIG),
             "retries should be overwritten");
 
+        Properties validProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+            }};
+        config = new ProducerConfig(validProps2);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be disabled when retries set to 0 and `enable.idempotence` config is unset.");
+        assertEquals(
+            0,
+            config.getInt(ProducerConfig.RETRIES_CONFIG),
+            "retries should be set with overridden value");
+
         Properties invalidProps = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.RETRIES_CONFIG, "0");
@@ -305,32 +335,23 @@ public class KafkaProducerTest {
         Properties invalidProps2 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.RETRIES_CONFIG, "0");
-            }};
-        assertThrows(
-            ConfigException.class,
-            () -> new ProducerConfig(invalidProps2),
-            "Must set retries to non-zero when using the idempotent producer.");
-
-        Properties invalidProps3 = new Properties() {{
-                putAll(baseProps);
-                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
                 // explicitly enabling idempotence should still throw exception
                 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
             ConfigException.class,
-            () -> new ProducerConfig(invalidProps3),
+            () -> new ProducerConfig(invalidProps2),
             "Must set retries to non-zero when using the idempotent producer.");
 
-        Properties invalidProps4 = new Properties() {{
+        Properties invalidProps3 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.RETRIES_CONFIG, "0");
                 setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
             }};
         assertThrows(
             ConfigException.class,
-            () -> new ProducerConfig(invalidProps4),
-            "Must set retries to non-zero when using the idempotent producer.");
+            () -> new ProducerConfig(invalidProps3),
+            "Must set retries to non-zero when using the transactional producer.");
     }
 
     @Test
@@ -343,7 +364,7 @@ public class KafkaProducerTest {
 
         Properties validProps = new Properties() {{
                 putAll(baseProps);
-                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
                 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
             }};
         ProducerConfig config = new ProducerConfig(validProps);
@@ -351,10 +372,24 @@ public class KafkaProducerTest {
             config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
             "idempotence should be overwritten");
         assertEquals(
-            10,
+            6,
             config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
             "max.in.flight.requests.per.connection should be overwritten");
 
+        Properties validProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
+            }};
+        config = new ProducerConfig(validProps2);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be disabled when `max.in.flight.requests.per.connection` is greater than 5 and " +
+                "`enable.idempotence` config is unset.");
+        assertEquals(
+            6,
+            config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+            "`max.in.flight.requests.per.connection` should be set with overridden value");
+
         Properties invalidProps = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
@@ -368,32 +403,23 @@ public class KafkaProducerTest {
 
         Properties invalidProps2 = new Properties() {{
                 putAll(baseProps);
-                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
-            }};
-        assertThrows(
-            ConfigException.class,
-            () -> new ProducerConfig(invalidProps2),
-            "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
-
-        Properties invalidProps3 = new Properties() {{
-                putAll(baseProps);
-                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
                 // explicitly enabling idempotence should still throw exception
                 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
             ConfigException.class,
-            () -> new ProducerConfig(invalidProps3),
+            () -> new ProducerConfig(invalidProps2),
             "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
 
-        Properties invalidProps4 = new Properties() {{
+        Properties invalidProps3 = new Properties() {{
                 putAll(baseProps);
-                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "6");
                 setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
             }};
         assertThrows(
             ConfigException.class,
-            () -> new ProducerConfig(invalidProps4),
+            () -> new ProducerConfig(invalidProps3),
             "Must set retries to non-zero when using the idempotent producer.");
     }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 6897170..173cbe8 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,7 +21,8 @@
 
 <h5><a id="upgrade_301_notable" href="#upgrade_301_notable">Notable changes in 3.0.1</a></h5>
 <ul>
-    <li>A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
+    <li>Idempotence for the producer is enabled by default if no conflicting configurations are set.
+        A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
         <code>enable.idempotence</code> to true. See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details.
         This issue was fixed and the default is properly applied.</li>
 </ul>