You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/01/01 18:50:00 UTC

[kafka] branch trunk updated: KAFKA-8928: Logged producer config does not always match actual configured values (#7466)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b56b848  KAFKA-8928: Logged producer config does not always match actual configured values (#7466)
b56b848 is described below

commit b56b848682079052e98ecde9a15a8f6b8860c599
Author: huxi <hu...@hotmail.com>
AuthorDate: Thu Jan 2 02:49:21 2020 +0800

    KAFKA-8928: Logged producer config does not always match actual configured values (#7466)
    
    Some logged producer configs(clientId, acks, retries) might not be reflected the actual values.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/producer/KafkaProducer.java      | 93 ++++++----------------
 .../kafka/clients/producer/ProducerConfig.java     | 70 +++++++++++++++-
 .../apache/kafka/common/config/AbstractConfig.java |  2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 16 ++++
 4 files changed, 110 insertions(+), 71 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5d749a..66a1103 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -81,7 +81,6 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 
@@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference;
 public class KafkaProducer<K, V> implements Producer<K, V> {
 
     private final Logger log;
-    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
     public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
     public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
@@ -333,7 +331,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                     (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
 
-            this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);
+            this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
 
             LogContext logContext;
             if (transactionalId == null)
@@ -433,19 +431,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private static String buildClientId(String configuredClientId, String transactionalId) {
-        if (!configuredClientId.isEmpty())
-            return configuredClientId;
-
-        if (transactionalId != null)
-            return "producer-" + transactionalId;
-
-        return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
-    }
-
     // visible for testing
     Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
-        int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
+        int maxInflightRequests = configureInflightRequests(producerConfig);
         int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
         ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -467,8 +455,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 apiVersions,
                 throttleTimeSensor,
                 logContext);
-        int retries = configureRetries(producerConfig, transactionManager != null, log);
-        short acks = configureAcks(producerConfig, transactionManager != null, log);
+        int retries = configureRetries(producerConfig, log);
+        short acks = configureAcks(producerConfig, log);
         return new Sender(logContext,
                 client,
                 metadata,
@@ -516,23 +504,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
         TransactionManager transactionManager = null;
 
-        boolean userConfiguredIdempotence = false;
-        if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
-            userConfiguredIdempotence = true;
-
-        boolean userConfiguredTransactions = false;
-        if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
-            userConfiguredTransactions = true;
+        boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+        boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+        if (userConfiguredTransactions && !userConfiguredIdempotence)
+            log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
+                    ProducerConfig.TRANSACTIONAL_ID_CONFIG);
 
-        boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
-
-        if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
-            throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-
-        if (userConfiguredTransactions)
-            idempotenceEnabled = true;
-
-        if (idempotenceEnabled) {
+        if (config.idempotenceEnabled()) {
             String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
             int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -542,63 +520,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             else
                 log.info("Instantiated an idempotent producer.");
         }
-
         return transactionManager;
     }
 
-    private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
-        boolean userConfiguredRetries = false;
-        if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
-            userConfiguredRetries = true;
-        }
-        if (idempotenceEnabled && !userConfiguredRetries) {
-            // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make
-            // this the default.
+    private static int configureRetries(ProducerConfig config, Logger log) {
+        boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
+        if (config.idempotenceEnabled() && !userConfiguredRetries) {
             log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
                     "producer is enabled.", Integer.MAX_VALUE);
-            return Integer.MAX_VALUE;
-        }
-        if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
-            throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
         }
         return config.getInt(ProducerConfig.RETRIES_CONFIG);
     }
 
-    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
-        if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+    private static int configureInflightRequests(ProducerConfig config) {
+        if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
             throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
                     " to use the idempotent producer.");
         }
         return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
     }
 
-    private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
-        boolean userConfiguredAcks = false;
-        short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
-        if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
-            userConfiguredAcks = true;
-        }
-
-        if (idempotenceEnabled && !userConfiguredAcks) {
-            log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
-            return -1;
-        }
+    private static short configureAcks(ProducerConfig config, Logger log) {
+        boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
+        short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
 
-        if (idempotenceEnabled && acks != -1) {
-            throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
-                    "producer. Otherwise we cannot guarantee idempotence.");
+        if (config.idempotenceEnabled()) {
+            if (!userConfiguredAcks)
+                log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
+            else if (acks != -1)
+                throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
+                        "producer. Otherwise we cannot guarantee idempotence.");
         }
         return acks;
     }
 
-    private static int parseAcks(String acksString) {
-        try {
-            return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
-        } catch (NumberFormatException e) {
-            throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
-        }
-    }
-
     /**
      * Needs to be called before any other methods when the transactional.id is set in the configuration.
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index fc1e6a7..75af5d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SecurityConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
@@ -32,6 +33,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
@@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig {
     public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
     private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
 
+    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig {
 
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
-        return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+        Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+        maybeOverrideEnableIdempotence(refinedConfigs);
+        maybeOverrideClientId(refinedConfigs);
+        maybeOverrideAcksAndRetries(refinedConfigs);
+        return refinedConfigs;
+    }
+
+    private void maybeOverrideClientId(final Map<String, Object> configs) {
+        String refinedClientId;
+        boolean userConfiguredClientId = this.originals().containsKey(CLIENT_ID_CONFIG);
+        if (userConfiguredClientId) {
+            refinedClientId = this.getString(CLIENT_ID_CONFIG);
+        } else {
+            String transactionalId = this.getString(TRANSACTIONAL_ID_CONFIG);
+            refinedClientId = "producer-" + (transactionalId != null ? transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
+        }
+        configs.put(CLIENT_ID_CONFIG, refinedClientId);
+    }
+
+    private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
+        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+
+        if (userConfiguredTransactions && !userConfiguredIdempotence) {
+            configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
+        }
+    }
+
+    private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
+        final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
+        configs.put(ACKS_CONFIG, acksStr);
+        // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
+        if (idempotenceEnabled()) {
+            boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
+            if (this.getInt(RETRIES_CONFIG) == 0) {
+                throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+            }
+            configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
+
+            boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
+            final short acks = Short.valueOf(acksStr);
+            if (userConfiguredAcks && acks != (short) -1) {
+                throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
+                        "producer. Otherwise we cannot guarantee idempotence.");
+            }
+            configs.put(ACKS_CONFIG, "-1");
+        }
+    }
+
+    private static String parseAcks(String acksString) {
+        try {
+            return acksString.trim().equalsIgnoreCase("all") ? "-1" : Short.parseShort(acksString.trim()) + "";
+        } catch (NumberFormatException e) {
+            throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
+        }
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
@@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
+    boolean idempotenceEnabled() {
+        boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+        boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+
+        if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
+            throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+        return userConfiguredTransactions || idempotenceEnabled;
+    }
+
     ProducerConfig(Map<?, ?> props, boolean doLog) {
         super(CONFIG, props, doLog);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3992a41..819770b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -106,12 +106,12 @@ public class AbstractConfig {
 
         this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
         this.values = definition.parse(this.originals);
+        this.used = Collections.synchronizedSet(new HashSet<>());
         Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
         for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
             this.values.put(update.getKey(), update.getValue());
         }
         definition.parse(this.values);
-        this.used = Collections.synchronizedSet(new HashSet<>());
         this.definition = definition;
         if (doLog)
             logAll();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index eee067f..92ae90b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -114,6 +114,22 @@ public class KafkaProducerTest {
             Collections.emptySet());
 
     @Test
+    public void testOverwriteAcksAndRetriesForIdempotentProducers() {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
+
+        ProducerConfig config = new ProducerConfig(props);
+        assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
+        assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
+        assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) == Integer.MAX_VALUE);
+        assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-" +
+                config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
+    }
+
+    @Test
     public void testMetricsReporterAutoGeneratedClientId() {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");