You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/07 17:57:52 UTC

[kafka] branch 3.0 updated: KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8093331  KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)
8093331 is described below

commit 8093331d81c87dbc55dbe8eed0503800cd9e71d6
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sun Feb 6 02:53:27 2022 +0800

    KAFKA-13598: enable idempotence producer by default and validate the configs (#11691)
    
    In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
    
    1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
    2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
    3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
    4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
    5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
    6. add additional tests to validate configuration behavior
    
    Reviewers: Kirk True <ki...@mustardgrain.com>, Ismael Juma <is...@juma.me.uk>, Mickael Maison <mi...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |   5 +-
 .../kafka/clients/producer/KafkaProducer.java      |  34 +---
 .../kafka/clients/producer/ProducerConfig.java     |  47 ++---
 .../kafka/clients/producer/KafkaProducerTest.java  | 225 ++++++++++++++++++---
 .../connect/storage/KafkaStatusBackingStore.java   |   1 +
 .../main/scala/kafka/tools/ConsoleProducer.scala   |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +
 ...merWithLegacyMessageFormatIntegrationTest.scala |   6 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  51 +++--
 .../scala/integration/kafka/api/MetricsTest.scala  |   5 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |  23 ++-
 .../server/DynamicBrokerReconfigurationTest.scala  |   2 +
 .../unit/kafka/server/LogDirFailureTest.scala      |   2 +
 docs/upgrade.html                                  |   5 +
 .../log4jappender/KafkaLog4jAppenderTest.java      |   4 +-
 15 files changed, 301 insertions(+), 113 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 58075d6..5371a73 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -194,8 +194,9 @@ public class CommonClientConfigs {
     public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
                                                     Map<String, Object> parsedValues) {
         HashMap<String, Object> rval = new HashMap<>();
-        if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
-                config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
+        Map<String, Object> originalConfig = config.originals();
+        if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
+            originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
             log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
                     RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
             rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1d18524..2d53b18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -442,7 +442,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     // visible for testing
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
-        int maxInflightRequests = configureInflightRequests(producerConfig);
+        int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
         int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
         ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -465,7 +465,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 apiVersions,
                 throttleTimeSensor,
                 logContext);
-        short acks = configureAcks(producerConfig, log);
+
+        short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
         return new Sender(logContext,
                 client,
                 metadata,
@@ -511,15 +512,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     private TransactionManager configureTransactionState(ProducerConfig config,
                                                          LogContext logContext) {
-
         TransactionManager transactionManager = null;
 
-        final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
-        final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
-        if (userConfiguredTransactions && !userConfiguredIdempotence)
-            log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
-                    ProducerConfig.TRANSACTIONAL_ID_CONFIG);
-
         if (config.idempotenceEnabled()) {
             final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
             final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
@@ -540,28 +534,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return transactionManager;
     }
 
-    private static int configureInflightRequests(ProducerConfig config) {
-        if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
-            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
-                    " to use the idempotent producer.");
-        }
-        return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-    }
-
-    private static short configureAcks(ProducerConfig config, Logger log) {
-        boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
-        short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
-
-        if (config.idempotenceEnabled()) {
-            if (!userConfiguredAcks)
-                log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
-            else if (acks != -1)
-                throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
-                        "producer. Otherwise we cannot guarantee idempotence.");
-        }
-        return acks;
-    }
-
     /**
      * Needs to be called before any other methods when the transactional.id is set in the configuration.
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0492fbf..57d3a8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -202,6 +202,8 @@ public class ProducerConfig extends AbstractConfig {
     private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
                                                                             + " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
                                                                             + " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
+    // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
+    private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
 
     /** <code>retries</code> */
     public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
@@ -246,8 +248,8 @@ public class ProducerConfig extends AbstractConfig {
     public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
     public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
                                                         + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
-                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5 "
-                                                        + "(with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+                                                        + " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
                                                         + ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
                                                         + "values are set, a <code>ConfigException</code> will be thrown.";
 
@@ -415,9 +417,8 @@ public class ProducerConfig extends AbstractConfig {
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
         Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
-        maybeOverrideEnableIdempotence(refinedConfigs);
+        postProcessAndValidateIdempotenceConfigs(refinedConfigs);
         maybeOverrideClientId(refinedConfigs);
-        maybeOverrideAcksAndRetries(refinedConfigs);
         return refinedConfigs;
     }
 
@@ -433,33 +434,30 @@ public class ProducerConfig extends AbstractConfig {
         configs.put(CLIENT_ID_CONFIG, refinedClientId);
     }
 
-    private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
-        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
-        boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-
-        if (userConfiguredTransactions && !userConfiguredIdempotence) {
-            configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
-        }
-    }
-
-    private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
+    private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object> configs) {
+        final Map<String, Object> originalConfigs = this.originals();
         final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
         configs.put(ACKS_CONFIG, acksStr);
-        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
+
+        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
         if (idempotenceEnabled()) {
-            boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
-            if (this.getInt(RETRIES_CONFIG) == 0) {
+            boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
+            if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
                 throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
             }
-            configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
 
-            boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
+            boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
             final short acks = Short.valueOf(acksStr);
             if (userConfiguredAcks && acks != (short) -1) {
                 throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
                         "producer. Otherwise we cannot guarantee idempotence.");
             }
-            configs.put(ACKS_CONFIG, "-1");
+
+            boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+            if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+                throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
+                        " to use the idempotent producer.");
+            }
         }
     }
 
@@ -491,13 +489,12 @@ public class ProducerConfig extends AbstractConfig {
     }
 
     boolean idempotenceEnabled() {
-        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
         boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
-        boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
-
-        if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
+        boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+        if (!idempotenceEnabled && userConfiguredTransactions)
             throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-        return userConfiguredTransactions || idempotenceEnabled;
+
+        return idempotenceEnabled;
     }
 
     ProducerConfig(Map<?, ?> props, boolean doLog) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 1067db9..dbc966d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -71,6 +71,8 @@ import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -121,8 +123,7 @@ import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
     private final String topic = "topic";
-    private final Node host1 = new Node(0, "host1", 1000);
-    private final Collection<Node> nodes = Collections.singletonList(host1);
+    private final Collection<Node> nodes = Collections.singletonList(NODE);
     private final Cluster emptyCluster = new Cluster(
             null,
             nodes,
@@ -145,6 +146,7 @@ public class KafkaProducerTest {
             Collections.emptySet(),
             Collections.emptySet());
     private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
+    private static final Node NODE = new Node(0, "host1", 1000);
 
 
     private static <K, V> KafkaProducer<K, V> kafkaProducer(Map<String, Object> configs,
@@ -239,6 +241,7 @@ public class KafkaProducerTest {
         Properties invalidProps2 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "1");
+                // explicitly enabling idempotence should still throw exception
                 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
@@ -249,12 +252,149 @@ public class KafkaProducerTest {
         Properties invalidProps3 = new Properties() {{
                 putAll(baseProps);
                 setProperty(ProducerConfig.ACKS_CONFIG, "0");
-                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
             }};
         assertThrows(
             ConfigException.class,
             () -> new ProducerConfig(invalidProps3),
             "Must set acks to all in order to use the idempotent producer");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
+    }
+
+    @Test
+    public void testRetriesAndIdempotenceForIdempotentProducers() {
+        Properties baseProps = new Properties() {{
+                setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+                setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            }};
+
+        Properties validProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            0,
+            config.getInt(ProducerConfig.RETRIES_CONFIG),
+            "retries should be overwritten");
+
+        Properties invalidProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps),
+            "Cannot set a transactional.id without also enabling idempotence");
+
+        Properties invalidProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps2),
+            "Must set retries to non-zero when using the idempotent producer.");
+
+        Properties invalidProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                // explicitly enabling idempotence should still throw exception
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps3),
+            "Must set retries to non-zero when using the idempotent producer.");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.RETRIES_CONFIG, "0");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
+    }
+
+    @Test
+    public void testInflightRequestsAndIdempotenceForIdempotentProducers() {
+        Properties baseProps = new Properties() {{
+                setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+                setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+            }};
+
+        Properties validProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            10,
+            config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+            "max.in.flight.requests.per.connection should be overwritten");
+
+        Properties invalidProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps),
+            "Cannot set a transactional.id without also enabling idempotence");
+
+        Properties invalidProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps2),
+            "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
+
+        Properties invalidProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                // explicitly enabling idempotence should still throw exception
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps3),
+            "Must set max.in.flight.requests.per.connection to at most 5 when using the idempotent producer.");
+
+        Properties invalidProps4 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps4),
+            "Must set retries to non-zero when using the idempotent producer.");
     }
 
     @Test
@@ -466,9 +606,17 @@ public class KafkaProducerTest {
     private static KafkaProducer<String, String> producerWithOverrideNewSender(Map<String, Object> configs,
                                                                                ProducerMetadata metadata,
                                                                                Time timer) {
+        // let mockClient#leastLoadedNode return the node directly so that we can isolate Metadata calls from KafkaProducer for idempotent producer
+        MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
+            @Override
+            public Node leastLoadedNode(long now) {
+                return NODE;
+            }
+        };
+
         return new KafkaProducer<String, String>(
                 new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, new StringSerializer(), new StringSerializer())),
-                new StringSerializer(), new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, timer) {
+                new StringSerializer(), new StringSerializer(), metadata, mockClient, null, timer) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                 // give Sender its own Metadata instance so that we can isolate Metadata calls from KafkaProducer
@@ -477,10 +625,13 @@ public class KafkaProducerTest {
         };
     }
 
-    @Test
-    public void testMetadataFetch() throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataFetch(boolean isIdempotenceEnabled) throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
+
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
         // Return empty cluster 4 times and cluster from then on
@@ -510,18 +661,14 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataExpiry() throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataExpiry(boolean isIdempotenceEnabled) throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
         ProducerMetadata metadata = mock(ProducerMetadata.class);
 
-        Cluster emptyCluster = new Cluster(
-            "dummy",
-            Collections.singletonList(host1),
-            Collections.emptySet(),
-            Collections.emptySet(),
-            Collections.emptySet());
         when(metadata.fetch()).thenReturn(onePartitionCluster, emptyCluster, onePartitionCluster);
 
         KafkaProducer<String, String> producer = producerWithOverrideNewSender(configs, metadata);
@@ -542,11 +689,13 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataTimeoutWithMissingTopic() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataTimeoutWithMissingTopic(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -567,6 +716,7 @@ public class KafkaProducerTest {
 
         // Four request updates where the topic isn't present, at which point the timeout expires and a
         // TimeoutException is thrown
+        // For idempotence enabled case, the first metadata.fetch will be called in Sender#maybeSendAndPollTransactionalRequest
         Future<RecordMetadata> future = producer.send(record);
         verify(metadata, times(4)).requestUpdateForTopic(topic);
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
@@ -580,11 +730,13 @@ public class KafkaProducerTest {
         }
     }
 
-    @Test
-    public void testMetadataWithPartitionOutOfRange() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -604,11 +756,13 @@ public class KafkaProducerTest {
         producer.close(Duration.ofMillis(0));
     }
 
-    @Test
-    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMetadataTimeoutWithPartitionOutOfRange(boolean isIdempotenceEnabled) throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled);
 
         // Create a record with a partition higher than the initial (outdated) partition range
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, null, "value");
@@ -629,7 +783,10 @@ public class KafkaProducerTest {
 
         // Four request updates where the requested partition is out of range, at which point the timeout expires
         // and a TimeoutException is thrown
+        // For idempotence enabled case, the first and last metadata.fetch will be called in Sender#maybeSendAndPollTransactionalRequest,
+        // before the producer#send and after it finished
         Future<RecordMetadata> future = producer.send(record);
+
         verify(metadata, times(4)).requestUpdateForTopic(topic);
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
         verify(metadata, times(5)).fetch();
@@ -647,6 +804,8 @@ public class KafkaProducerTest {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
+        // test under normal producer for simplicity
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
         long metadataIdleMs = 60000L;
@@ -794,6 +953,9 @@ public class KafkaProducerTest {
     public void testFlushCompleteSendOfInflightBatches() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        // only test in idempotence disabled producer for simplicity
+        // flush operation acts the same for idempotence enabled and disabled cases
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
 
         Time time = new MockTime(1);
         MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
@@ -809,6 +971,7 @@ public class KafkaProducerTest {
                 Future<RecordMetadata> response = producer.send(new ProducerRecord<>("topic", "value" + i));
                 futureResponses.add(response);
             }
+
             futureResponses.forEach(res -> assertFalse(res.isDone()));
             producer.flush();
             futureResponses.forEach(res -> assertTrue(res.isDone()));
@@ -886,14 +1049,14 @@ public class KafkaProducerTest {
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
                     ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", host1));
+                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
 
             assertThrows(TimeoutException.class, producer::initTransactions);
 
             client.prepareResponse(
                 request -> request instanceof FindCoordinatorRequest &&
                                ((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", host1));
+                FindCoordinatorResponse.prepareResponse(Errors.NONE, "bad-transaction", NODE));
 
             client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
@@ -919,7 +1082,7 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -941,7 +1104,7 @@ public class KafkaProducerTest {
         MockClient client = new MockClient(time, metadata);
         client.updateMetadata(initialUpdateResponse);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(endTxnResponse(Errors.NONE));
 
@@ -970,10 +1133,10 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         String groupId = "group";
         client.prepareResponse(request ->
             ((TxnOffsetCommitRequest) request).data().groupId().equals(groupId),
@@ -1009,10 +1172,10 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
         client.prepareResponse(addOffsetsToTxnResponse(Errors.NONE));
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         String groupId = "group";
         String memberId = "member";
         int generationId = 5;
@@ -1065,7 +1228,7 @@ public class KafkaProducerTest {
         Node node = metadata.fetch().nodes().get(0);
         client.throttle(node, 5000);
 
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
 
         try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
@@ -1276,7 +1439,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
@@ -1305,7 +1468,7 @@ public class KafkaProducerTest {
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         CountDownLatch assertionDoneLatch = new CountDownLatch(1);
-        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", host1));
+        client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "this-is-a-transactional-id", NODE));
         executorService.submit(() -> {
             assertThrows(KafkaException.class, producer::initTransactions);
             assertionDoneLatch.countDown();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 44902c0..eb28102 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -170,6 +170,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0
         ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 7c221ba..4a6f731 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -164,7 +164,7 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("request required acks")
       .ofType(classOf[java.lang.String])
-      .defaultsTo("1")
+      .defaultsTo("-1")
     val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
       .withRequiredArg
       .describedAs("request timeout ms")
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1e41fcf..acaef28 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -142,6 +142,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val adminClients = Buffer[Admin]()
 
   producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
   producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
   consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
 
@@ -2341,6 +2342,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
     producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
     producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
     createProducer()
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
index e8c451e..0ce4004 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
@@ -18,6 +18,7 @@ package kafka.api
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
+import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows}
 import org.junit.jupiter.api.Test
@@ -101,7 +102,10 @@ class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTes
   def testEarliestOrLatestOffsets(): Unit = {
     val topic0 = "topicWithNewMessageFormat"
     val topic1 = "topicWithOldMessageFormat"
-    val producer = createProducer()
+    val prop = new Properties()
+    // idempotence producer doesn't support old version of messages
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
+    val producer = createProducer(configOverrides = prop)
     createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
     val props = new Properties()
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index f361641..2d0f2ee 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -20,7 +20,7 @@ package kafka.api
 import com.yammer.metrics.core.Gauge
 
 import java.io.File
-import java.util.Collections
+import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
 import kafka.admin.AclCommand
 import kafka.metrics.KafkaYammerMetrics
@@ -30,7 +30,7 @@ import kafka.server._
 import kafka.utils._
 import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.acl.AclOperation._
 import org.apache.kafka.common.acl.AclPermissionType._
@@ -42,6 +42,8 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
@@ -334,13 +336,18 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * messages and describe topics respectively when the describe ACL isn't set.
     * Also verifies that subsequent publish, consume and describe to authorized topic succeeds.
     */
-  @Test
-  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
     // Set consumer group acls since we are testing topic authorization
     setConsumerGroupAcls()
 
     // Verify produce/consume/describe throw TopicAuthorizationException
-    val producer = createProducer()
+
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
     assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
@@ -352,8 +359,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // Verify successful produce/consume/describe on another topic using the same producer, consumer and adminClient
     val topic2 = "topic2"
     val tp2 = new TopicPartition(topic2, 0)
+
     setReadAndWriteAcls(tp2)
-    sendRecords(producer, numRecords, tp2)
+    // in idempotence producer, we need to create another producer because the previous one is in FATAL_ERROR state (due to authorization error)
+    // If the transaction state in FATAL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail
+    val producer2 = if (isIdempotenceEnabled)
+      createProducer(configOverrides = prop)
+    else
+      producer
+
+    sendRecords(producer2, numRecords, tp2)
     consumer.assign(List(tp2).asJava)
     consumeRecords(consumer, numRecords, topic = topic2)
     val describeResults = adminClient.describeTopics(Set(topic, topic2).asJava).values
@@ -365,7 +380,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // from the unauthorized topic and throw; since we can now return data during the time we are updating
     // metadata / fetching positions, it is possible that the authorized topic record is returned during this time.
     consumer.assign(List(tp, tp2).asJava)
-    sendRecords(producer, numRecords, tp2)
+    sendRecords(producer2, numRecords, tp2)
     var topic2RecordConsumed = false
     def verifyNoRecords(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
       assertEquals(Collections.singleton(tp2), records.partitions(), "Consumed records with unexpected partitions: " + records)
@@ -380,22 +395,32 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     if (!topic2RecordConsumed) {
       consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 1, topic2)
     }
-    sendRecords(producer, numRecords, tp)
+    sendRecords(producer2, numRecords, tp)
     consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, startingOffset = 0, topic)
     val describeResults2 = adminClient.describeTopics(Set(topic, topic2).asJava).values
     assertEquals(1, describeResults2.get(topic).get().partitions().size())
     assertEquals(1, describeResults2.get(topic2).get().partitions().size())
   }
 
-  @Test
-  def testNoProduceWithDescribeAcl(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testNoProduceWithDescribeAcl(isIdempotenceEnabled: Boolean): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
-    val producer = createProducer()
-    val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
-    assertEquals(Set(topic).asJava, e.unauthorizedTopics())
+
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
+    if (isIdempotenceEnabled) {
+      // in idempotent producer, it'll fail at InitProducerId request
+      assertThrows(classOf[KafkaException], () => sendRecords(producer, numRecords, tp))
+    } else {
+      val e = assertThrows(classOf[TopicAuthorizationException], () => sendRecords(producer, numRecords, tp))
+      assertEquals(Set(topic).asJava, e.unauthorizedTopics())
+    }
     confirmReauthenticationMetrics()
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index c5c17c2..bef59f0 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -82,7 +82,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     // Produce and consume some records
     val numRecords = 10
     val recordSize = 100000
-    val producer = createProducer()
+    val prop = new Properties()
+    // idempotence producer doesn't support old version of messages
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
+    val producer = createProducer(configOverrides = prop)
     sendRecords(producer, numRecords, recordSize, tp)
 
     val consumer = createConsumer()
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 2d4abf4..40c4e65 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -14,9 +14,8 @@ package kafka.api
 
 import java.nio.file.Files
 import java.time.Duration
-import java.util.Collections
+import java.util.{Collections, Properties}
 import java.util.concurrent.{ExecutionException, TimeUnit}
-
 import scala.jdk.CollectionConverters._
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -30,6 +29,8 @@ import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with SaslSetup {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
@@ -76,14 +77,24 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     closeSasl()
   }
 
-  @Test
-  def testProducerWithAuthenticationFailure(): Unit = {
-    val producer = createProducer()
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): Unit = {
+    val prop = new Properties()
+    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, isIdempotenceEnabled.toString)
+    val producer = createProducer(configOverrides = prop)
+
     verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 10000))
     verifyAuthenticationException(producer.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(sendOneRecord(producer))
+    // in idempotence producer, we need to create another producer because the previous one is in FATEL_ERROR state (due to authentication error)
+    // If the transaction state in FATAL_ERROR, it'll never transit to other state. check TransactionManager#isTransitionValid for detail
+    val producer2 = if (isIdempotenceEnabled)
+      createProducer(configOverrides = prop)
+    else
+      producer
+    verifyWithRetry(sendOneRecord(producer2))
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 08a55ae..88b0f6d 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1669,6 +1669,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString)
       producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, _deliveryTimeoutMs.toString)
       producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs.toString)
+      // disable the idempotence since some tests want to test the cases when retries=0, and these tests are not testing producers
+      producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
 
       val producer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer)
       producers += producer
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6d6d881..252d4bd 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -109,6 +109,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   @Test
   def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
     val producer = createProducer()
     val partition = new TopicPartition(topic, 0)
 
@@ -140,6 +141,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = {
     // Disable retries to allow exception to bubble up for validation
     this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    this.producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
     val producer = createProducer()
 
     val partition = new TopicPartition(topic, 0)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index da1bc49..6e35533 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,6 +98,11 @@
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
+    <li>Idempotence for the producer is enabled by default. In 3.0.0, a bug prevented this default from being applied,
+        which meant that idempotence remained disabled unless the user had explicitly set <code>enable.idempotence</code> to true
+        (See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details).
+        This issue was fixed and the default is properly applied in 3.0.1.
+    </li>
     <li>
         The Connect <code>internal.key.converter</code> and <code>internal.value.converter</code> properties have been completely <a href="https://cwiki.apache.org/confluence/x/2YDOCg">removed</a>.
         The use of these Connect worker properties has been deprecated since version 2.0.0. 
diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
index 7ec56335..90a791f 100644
--- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
+++ b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java
@@ -190,7 +190,7 @@ public class KafkaLog4jAppenderTest {
         props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
         props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093");
         props.put("log4j.appender.KAFKA.Topic", "test-topic");
-        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
         props.put("log4j.appender.KAFKA.SyncSend", "true");
         // setting producer timeout (max.block.ms) to be low
         props.put("log4j.appender.KAFKA.maxBlockMs", "10");
@@ -208,7 +208,7 @@ public class KafkaLog4jAppenderTest {
         props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
         props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
         props.put("log4j.appender.KAFKA.Topic", "test-topic");
-        props.put("log4j.appender.KAFKA.RequiredNumAcks", "1");
+        props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
         props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend));
         props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
         return props;