You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/05 19:57:08 UTC

[kafka] branch 3.1 updated: KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 5227478  KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)
5227478 is described below

commit 5227478fb18e4589d1ef8044bf4a26780400cedb
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sun Feb 6 02:53:27 2022 +0800

    KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)
    
    In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
    
    1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
    2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
    3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
    4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
    5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
    6. add additional tests to validate configuration behavior
    
    Reviewers: Kirk True <ki...@mustardgrain.com>, Ismael Juma <is...@juma.me.uk>, Mickael Maison <mi...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |   5 +-
 .../kafka/clients/producer/KafkaProducer.java      |  34 +--
 .../kafka/clients/producer/ProducerConfig.java     |  47 ++---
 .../kafka/clients/producer/KafkaProducerTest.java  | 233 +++++++++++++++++----
 .../connect/storage/KafkaStatusBackingStore.java   |   1 +
 .../main/scala/kafka/tools/ConsoleProducer.scala   |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +
 ...merWithLegacyMessageFormatIntegrationTest.scala |   6 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  52 +++--
 .../scala/integration/kafka/api/MetricsTest.scala  |   5 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |  23 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   2 +
 .../unit/kafka/server/LogDirFailureTest.scala      |   2 +
 docs/upgrade.html                                  |   8 +
 .../log4jappender/KafkaLog4jAppenderTest.java      |   4 +-
 15 files changed, 309 insertions(+), 117 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 58075d6..5371a73 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -194,8 +194,9 @@ public class CommonClientConfigs {
     public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
                                                     Map<String, Object> parsedValues) {
         HashMap<String, Object> rval = new HashMap<>();
-        if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
-                config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
+        Map<String, Object> originalConfig = config.originals();
+        if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
+            originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
             log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
                     RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
             rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 2a68d0f..ac3343e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -445,7 +445,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     // visible for testing
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
-        int maxInflightRequests = configureInflightRequests(producerConfig);
+        int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
         int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
         ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -468,7 +468,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 apiVersions,
                 throttleTimeSensor,
                 logContext);
-        short acks = configureAcks(producerConfig, log);
+
+        short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
         return new Sender(logContext,
                 client,
                 metadata,
@@ -514,15 +515,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     private TransactionManager configureTransactionState(ProducerConfig config,
                                                          LogContext logContext) {
-
         TransactionManager transactionManager = null;
 
-        final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
-        final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
-        if (userConfiguredTransactions && !userConfiguredIdempotence)
-            log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
-                    ProducerConfig.TRANSACTIONAL_ID_CONFIG);
-
         if (config.idempotenceEnabled()) {
             final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
             final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
@@ -543,28 +537,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return transactionManager;
     }
 
-    private static int configureInflightRequests(ProducerConfig config) {
-        if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
-                    " to use the idempotent producer.");
-        }
-        return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-    }
-
-    private static short configureAcks(ProducerConfig config, Logger log) {
-        boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
-        short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
-
-        if (config.idempotenceEnabled()) {
-            if (!userConfiguredAcks)
-                log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
-            else if (acks != -1)
-                throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
-                        "producer. Otherwise we cannot guarantee idempotence.");
-        }
-        return acks;
-    }
-
     /**
      * Needs to be called before any other methods when the transactional.id is set in the configuration.
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index fbd3449..9a698b1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -207,6 +207,8 @@ public class ProducerConfig extends AbstractConfig {
     private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                             + " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
                                                                             + " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
+    // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
 
     /** <code>retries</code> */
     public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
@@ -269,8 +271,8 @@ public class ProducerConfig extends AbstractConfig {
     public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
     public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
                                                         + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
-                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5 "
-                                                        + "(with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+                                                        + " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
                                                         + ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
                                                         + "values are set, a <code>ConfigException</code> will be thrown.";
 
@@ -438,9 +440,8 @@ public class ProducerConfig extends AbstractConfig {
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
         Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
-        maybeOverrideEnableIdempotence(refinedConfigs);
+        postProcessAndValidateIdempotenceConfigs(refinedConfigs);
         maybeOverrideClientId(refinedConfigs);
-        maybeOverrideAcksAndRetries(refinedConfigs);
         return refinedConfigs;
     }
 
@@ -456,33 +457,30 @@ public class ProducerConfig extends AbstractConfig {
         configs.put(CLIENT_ID_CONFIG, refinedClientId);
     }
 
-    private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
-        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
-        boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-
-        if (userConfiguredTransactions && !userConfiguredIdempotence) {
-            configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
-        }
-    }
-
-    private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
+    private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object> configs) {
+        final Map<String, Object> originalConfigs = this.originals();
         final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
         configs.put(ACKS_CONFIG, acksStr);
-        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
+
+        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
         if (idempotenceEnabled()) {
-            boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
-            if (this.getInt(RETRIES_CONFIG) == 0) {
+            boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
+            if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
                 throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
             }
-            configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
 
-            boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
+            boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
             final short acks = Short.valueOf(acksStr);
             if (userConfiguredAcks && acks != (short) -1) {
                 throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
                         "producer. Otherwise we cannot guarantee idempotence.");
             }
-            configs.put(ACKS_CONFIG, "-1");
+
+            boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+            if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+                throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+                        " to use the idempotent producer.");
+            }
         }
     }
 
@@ -514,13 +512,12 @@ public class ProducerConfig extends AbstractConfig {
     }
 
     boolean idempotenceEnabled() {
-        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
         boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-        boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
-
-        if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
+        boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+        if (!idempotenceEnabled && userConfiguredTransactions)
             throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-        return userConfiguredTransactions || idempotenceEnabled;
+
+        return idempotenceEnabled;
     }
 
     ProducerConfig(Map<?, ?> props, boolean doLog) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 7c521fd..b2ec2e4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -74,6 +74,8 @@ import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -124,8 +126,7 @@ import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
     private final String topic = "topic";
-    private final Node host1 = new Node(0, "host1", 1000);
-    private final Collection<Node> nodes = Collections.singletonList(host1);
+    private final Collection<Node> nodes = Collections.singletonList(NODE);
     private final Cluster emptyCluster = new Cluster(
             null,
             nodes,
@@ -148,6 +149,7 @@ public class KafkaProducerTest {
             Collections.emptySet(),
             Collections.emptySet());
     private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
+    private static final Node NODE = new Node(0, "host1", 1000);
 
 
     private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> configs,
@@ -242,6 +244,7 @@ public class KafkaProducerTest {
         Properties invalidProps2 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "1");
+                // explicitly enabling idempotence should still throw exception
                 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
@@ -252,12 +255,149 @@ public class KafkaProducerTest {
         Properties invalidProps3 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "0");
-                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
             ConfigException.class,
             () -> new ProducerConfig(invalidProps3),
             "Must set acks to all in order to use the idempotent producer");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
+    }
+
+    @Test
+    public void testRetriesAndIdempotenceForIdempotentProducers() {
+        Properties baseProps = new Properties() {{
+                setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+                setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            }};
+
+        Properties validProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            0,
+            config.getInt(ProducerConfig.RETRIES_CONFIG),
+            "retries should be overwritten");
+
+        Properties invalidProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps),
+            "Cannot set a transactional.id without also enabling idempotence");
+
+        Properties invalidProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps2),
+            "Must set retries to non-zero when using the idempotent producer.");
+
+        Properties invalidProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                // explicitly enabling idempotence should still throw exception
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps3),
+            "Must set retries to non-zero when using the idempotent producer.");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
+    }
+
+    @Test
+    public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
+        Properties baseProps = new Properties() {{
+                setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+                setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            }};
+
+        Properties validProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            10,
+            config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+            "max.in.flight.requests.per.connection should be overwritten");
+
+        Properties invalidProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps),
+            "Cannot set a transactional.id without also enabling idempotence");
+
+        Properties invalidProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps2),
+            "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
+
+        Properties invalidProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                // explicitly enabling idempotence should still throw exception
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps3),
+            "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
     }
 
     @Test
@@ -469,9 +609,17 @@ public class KafkaProducerTest {
     private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> configs,
                                                                                ProducerMetadata metadata,
                                                                                Time timer) {
+        // let mockClient#leastLoadedNode return the node directly so that we can isolate Metadata calls from KafkaProducer for idempotent producer
+        MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
+            @Override
+            public Node leastLoadedNode(long now) {
+                return NODE;
+            }
+        };
+
         return new KafkaProducer<String, String>(
                 new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, timer) {
+                new StringSerializer(), new StringSerializer(), metadata, mockClient, null, timer) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                 // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -480,10 +628,13 @@ public class KafkaProducerTest {
         };
     }
 
-    @Test
-    public void testMetadataFetch() throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataFetch(boolean isIdempotenceEnabled) throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
+
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
         // Return empty cluster 4 times and cluster from then on
@@ -513,18 +664,14 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataExpiry() throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataExpiry(boolean isIdempotenceEnabled) throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
-        Cluster emptyCluster = new Cluster(
-            "dummy",
-            Collections.singletonList(host1),
-            Collections.emptySet(),
-            Collections.emptySet(),
-            Collections.emptySet());
         when(metadata.fetch()).thenReturn(onePartitionCluster, emptyCluster, onePartitionCluster);
 
         KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
@@ -545,11 +692,13 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataTimeoutWithMissingTopic() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataTimeoutWithMissingTopic(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -570,6 +719,7 @@ public class KafkaProducerTest {
 
         // Four request updates where the topic isn't present, at which point the timeout expires and a
         // TimeoutException is thrown
+        // For idempotence enabled case, the first metadata.fetch will be called in Sender#maybeSendAndPollTransactionalRequest
         Future<RecordMetadata> future = producer.send(record);
         verify(metadata, times(4)).requestUpdateForTopic(topic);
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
@@ -583,11 +733,13 @@ public class KafkaProducerTest {
         }
     }
 
-    @Test
-    public void testMetadataWithPartitionOutOfRange() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -607,11 +759,13 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataTimeoutWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -632,7 +786,10 @@ public class KafkaProducerTest {
 
         // Four request updates where the requested partition is out of range, at which point the timeout expires
         // and a TimeoutException is thrown
+        // For idempotence enabled case, the first and last metadata.fetch will be called in Sender#maybeSendAndPollTransactionalRequest,
+        // before the producer#send and after it finished
         Future<RecordMetadata> future = producer.send(record);
+
         verify(metadata, times(4)).requestUpdateForTopic(topic);
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
         verify(metadata, times(5)).fetch();
@@ -650,6 +807,8 @@ public class KafkaProducerTest {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
+        // test under normal producer for simplicity
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
         long metadataIdleMs = 60000L;
@@ -797,6 +956,9 @@ public class KafkaProducerTest {
     public void testFlushCompleteSendOfInflightBatches() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        // only test in idempotence disabled producer for simplicity
+        // flush operation acts the same for idempotence enabled and disabled cases
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
 
         Time time = new MockTime(1);
         MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
@@ -812,6 +974,7 @@ public class KafkaProducerTest {
                 Future<RecordMetadata> response = producer.send(new ProducerRecord<>("topic", "value" + i));
                 futureResponses.add(response);
             }
+
             futureResponses.forEach(res -> assertFalse(res.isDone()));
             producer.flush();
             futureResponses.forEach(res -> assertTrue(res.isDone()));
@@ -929,7 +1092,7 @@ public class KafkaProducerTest {
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
                     ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", host1));
+                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
 
             Future<?> future = executor.submit(producer::initTransactions);
             TestUtils.waitForCondition(client::hasInFlightRequests,
@@ -966,14 +1129,14 @@ public class KafkaProducerTest {
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
                     ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", host1));
+                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
 
             assertThrows(TimeoutException.class, producer::initTransactions);
 
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
                                ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", host1));
+                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
 
             client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
@@ -999,7 +1162,7 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -1021,7 +1184,7 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.updateMetadata(initialUpdateResponse);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(endTxnResponse(Errors.NONE));
 
@@ -1043,7 +1206,7 @@ public class KafkaProducerTest {
         ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
         MockClient client = new MockClient(time, metadata);
         client.updateMetadata(initialUpdateResponse);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -1080,10 +1243,10 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         String groupId = "group";
         client.prepareResponse(request ->
             ((TxnOffsetCommitRequest) request).data().groupId().equals(groupId),
@@ -1123,7 +1286,7 @@ public class KafkaProducerTest {
 
         MockClient client = new MockClient(time, metadata);
         client.updateMetadata(initialUpdateResponse);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -1132,7 +1295,7 @@ public class KafkaProducerTest {
             assertDurationAtLeast(producer, "txn-init-time-ns-total", tick.toNanos());
 
             client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
-            client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+            client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
             client.prepareResponse(txnOffsetsCommitResponse(Collections.singletonMap(
                 new TopicPartition("topic", 0), Errors.NONE)));
             client.prepareResponse(endTxnResponse(Errors.NONE));
@@ -1181,10 +1344,10 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         String groupId = "group";
         String memberId = "member";
         int generationId = 5;
@@ -1237,7 +1400,7 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -1448,7 +1611,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
@@ -1477,7 +1640,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 44902c0..eb28102 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -170,6 +170,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0
         ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 7c221ba..4a6f731 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -164,7 +164,7 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("request required acks")
       .ofType(classOf[java.lang.String])
-      .defaultsTo("1")
+      .defaultsTo("-1")
     val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
       .withRequiredArg
       .describedAs("request timeout ms")
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1da4fcd..0b215ff 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -143,6 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val adminClients = Buffer[Admin]()
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
   producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
@@ -2383,6 +2384,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
     producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
     producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
     createProducer()
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
index e8c451e..0ce4004 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
@@ -18,6 +18,7 @@ package kafka.api
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows}
 import org.junit.jupiter.api.Test
@@ -101,7 +102,10 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
   def testEarliestOrLatestOffsets(): Unit = {
     val topic0 = "topicWithNewMessageFormat"
     val topic1 = "topicWithOldMessageFormat"
-    val producer = createProducer()
+    val prop = new Properties()
+    // idempotence producer doesn't support old version of messages
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
+    val producer = createProducer(configOverrides = prop)
     createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
     val props = new Properties()
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index cbb536b..fb84312 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -20,7 +20,7 @@ package kafka.api
 import com.yammer.metrics.core.Gauge
 
 import java.io.File
-import java.util.Collections
+import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
 import kafka.admin.AclCommand
 import kafka.metrics.KafkaYammerMetrics
@@ -30,7 +30,7 @@ import kafka.server._
 import kafka.utils._
 import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType._
@@ -42,6 +42,8 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -334,13 +336,18 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * messages and describe topics respectively when the describe ACL isn't set.
     * Also verifies that subsequent publish, consume and describe to authorized topic succeeds.
     */
-  @Test
-  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
     // Set consumer group acls since we are testing topic authorization
     setConsumerGroupAcls()
 
     // Verify produce/consume/describe throw TopicAuthorizationException
-    val producer = createProducer()
+
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
     assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
@@ -352,12 +359,21 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // Verify successful produce/consume/describe on another topic using the same producer, consumer and adminClient
     val topic2 = "topic2"
     val tp2 = new TopicPartition(topic2, 0)
+
     setReadAndWriteAcls(tp2)
-    sendRecords(producer, numRecords, tp2)
+    // in idempotence producer, we need to create another producer because the previous one is in FATAL_ERROR state (due to authorization error)
+    // If the transaction state in FATAL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail
+    val producer2 = if (isIdempotenceEnabled)
+      createProducer(configOverrides = prop)
+    else
+      producer
+
+    sendRecords(producer2, numRecords, tp2)
     consumer.assign(List(tp2).asJava)
     consumeRecords(consumer, numRecords, topic = topic2)
     val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues()
     assertEquals(1, describeResults.get(topic2).get().partitions().size())
+
     val e2 = assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Set(topic).asJava).allTopicNames().get())
     assertTrue(e2.getCause.isInstanceOf[TopicAuthorizationException], "Unexpected exception " + e2.getCause)
 
@@ -365,7 +381,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // from the unauthorized topic and throw; since we can now return data during the time we are updating
     // metadata / fetching positions, it is possible that the authorized topic record is returned during this time.
     consumer.assign(List(tp, tp2).asJava)
-    sendRecords(producer, numRecords, tp2)
+    sendRecords(producer2, numRecords, tp2)
     var topic2RecordConsumed = false
     def verifyNoRecords(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
       assertEquals(Collections.singleton(tp2), records.partitions(), "Consumed records with unexpected partitions: " + records)
@@ -380,22 +396,32 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     if (!topic2RecordConsumed) {
       consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 1, topic2)
     }
-    sendRecords(producer, numRecords, tp)
+    sendRecords(producer2, numRecords, tp)
     consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 0, topic)
     val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).topicNameValues
     assertEquals(1, describeResults2.get(topic).get().partitions().size())
     assertEquals(1, describeResults2.get(topic2).get().partitions().size())
   }
 
-  @Test
-  def testNoProduceWithDescribeAcl(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testNoProduceWithDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
-    val producer = createProducer()
-    val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
-    assertEquals(Set(topic).asJava, e.unauthorizedTopics())
+
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
+    if (isIdempotenceEnabled) {
+      // in idempotent producer, it'll fail at InitProducerId request
+      assertThrows(classOf[KafkaException], () => sendRecords(producer, numRecords, tp))
+    } else {
+      val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
+      assertEquals(Set(topic).asJava, e.unauthorizedTopics())
+    }
     confirmReauthenticationMetrics()
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 850ac89..151bd0d 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -82,7 +82,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     // Produce and consume some records
     val numRecords = 10
     val recordSize = 100000
-    val producer = createProducer()
+    val prop = new Properties()
+    // idempotence producer doesn't support old version of messages
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
+    val producer = createProducer(configOverrides = prop)
     sendRecords(producer, numRecords, recordSize, tp)
 
     val consumer = createConsumer()
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index a9f2c6c..c9be59e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -14,9 +14,8 @@ package kafka.api
 
 import java.nio.file.Files
 import java.time.Duration
-import java.util.Collections
+import java.util.{Collections, Properties}
 import java.util.concurrent.{ExecutionException, TimeUnit}
-
 import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -30,6 +29,8 @@ import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with SaslSetup {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -76,14 +77,24 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     closeSasl()
   }
 
-  @Test
-  def testProducerWithAuthenticationFailure(): Unit = {
-    val producer = createProducer()
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): Unit = {
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
     verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 10000))
     verifyAuthenticationException(producer.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(sendOneRecord(producer))
+    // in idempotence producer, we need to create another producer because the previous one is in FATEL_ERROR state (due to authentication error)
+    // If the transaction state in FATAL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail
+    val producer2 = if (isIdempotenceEnabled)
+      createProducer(configOverrides = prop)
+    else
+      producer
+    verifyWithRetry(sendOneRecord(producer2))
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 687cd9e..9502d2e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1670,6 +1670,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString)
       producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, _deliveryTimeoutMs.toString)
       producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs.toString)
+      // disable the idempotence since some tests want to test the cases when retries=0, and these tests are not testing producers
+      producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
 
       val producer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer)
       producers += producer
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 1025d7a..bfbb14e 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -109,6 +109,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   @Test
   def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
     val producer = createProducer()
     val partition = new TopicPartition(topic, 0)
 
@@ -140,6 +141,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = {
     // Disable retries to allow exception to bubble up for validation
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
     val producer = createProducer()
 
     val partition = new TopicPartition(topic, 0)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 73ebe29..71ded0c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,14 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h5><a id="upgrade_320_notable" href="#upgrade_320_notable">Notable changes in 3.2.0</a></h5>
+    <ul>
+        <li>Idempotence for the producer is enabled by default. In 3.0.0 and 3.1.0, a bug prevented this default from being applied,
+            which meant that idempotence remained disabled unless the user had explicitly set <code>enable.idempotence</code> to true
+            (See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details).
+            This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.</li>
+    </ul>
+
 <h4><a id="upgrade_3_1_0" href="#upgrade_3_1_0">Upgrading to 3.1.0 from any version 0.8.x through 3.0.x</a></h4>
 
 <p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
index 7ec56335..90a791f 100644
--- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -190,7 +190,7 @@ public class KafkaLog4jAppenderTest {
         props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
         props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093");
         props.put("log4j.appender.KAFKA.Topic", "test-topic");
-        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
         props.put("log4j.appender.KAFKA.SyncSend", "true");
         // setting producer timeout (max.block.ms) to be low
         props.put("log4j.appender.KAFKA.maxBlockMs", "10");
@@ -208,7 +208,7 @@ public class KafkaLog4jAppenderTest {
         props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
         props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
         props.put("log4j.appender.KAFKA.Topic", "test-topic");
-        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
         props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend));
         props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
         return props;