You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/01/01 18:50:00 UTC
[kafka] branch trunk updated: KAFKA-8928: Logged producer config
does not always match actual configured values (#7466)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b56b848 KAFKA-8928: Logged producer config does not always match actual configured values (#7466)
b56b848 is described below
commit b56b848682079052e98ecde9a15a8f6b8860c599
Author: huxi <hu...@hotmail.com>
AuthorDate: Thu Jan 2 02:49:21 2020 +0800
KAFKA-8928: Logged producer config does not always match actual configured values (#7466)
Some logged producer configs(clientId, acks, retries) might not be reflected the actual values.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/producer/KafkaProducer.java | 93 ++++++----------------
.../kafka/clients/producer/ProducerConfig.java | 70 +++++++++++++++-
.../apache/kafka/common/config/AbstractConfig.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 16 ++++
4 files changed, 110 insertions(+), 71 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5d749a..66a1103 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -81,7 +81,6 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
- private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
@@ -333,7 +331,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
- this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);
+ this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
@@ -433,19 +431,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
- private static String buildClientId(String configuredClientId, String transactionalId) {
- if (!configuredClientId.isEmpty())
- return configuredClientId;
-
- if (transactionalId != null)
- return "producer-" + transactionalId;
-
- return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
- }
-
// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
- int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
+ int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -467,8 +455,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions,
throttleTimeSensor,
logContext);
- int retries = configureRetries(producerConfig, transactionManager != null, log);
- short acks = configureAcks(producerConfig, transactionManager != null, log);
+ int retries = configureRetries(producerConfig, log);
+ short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
@@ -516,23 +504,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
TransactionManager transactionManager = null;
- boolean userConfiguredIdempotence = false;
- if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
- userConfiguredIdempotence = true;
-
- boolean userConfiguredTransactions = false;
- if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
- userConfiguredTransactions = true;
+ boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ if (userConfiguredTransactions && !userConfiguredIdempotence)
+ log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG);
- boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
-
- if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
- throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-
- if (userConfiguredTransactions)
- idempotenceEnabled = true;
-
- if (idempotenceEnabled) {
+ if (config.idempotenceEnabled()) {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -542,63 +520,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
else
log.info("Instantiated an idempotent producer.");
}
-
return transactionManager;
}
- private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
- boolean userConfiguredRetries = false;
- if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
- userConfiguredRetries = true;
- }
- if (idempotenceEnabled && !userConfiguredRetries) {
- // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make
- // this the default.
+ private static int configureRetries(ProducerConfig config, Logger log) {
+ boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
+ if (config.idempotenceEnabled() && !userConfiguredRetries) {
log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
- return Integer.MAX_VALUE;
- }
- if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
- throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}
- private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
- if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+ private static int configureInflightRequests(ProducerConfig config) {
+ if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}
- private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
- boolean userConfiguredAcks = false;
- short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
- if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
- userConfiguredAcks = true;
- }
-
- if (idempotenceEnabled && !userConfiguredAcks) {
- log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
- return -1;
- }
+ private static short configureAcks(ProducerConfig config, Logger log) {
+ boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
+ short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
- if (idempotenceEnabled && acks != -1) {
- throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
- "producer. Otherwise we cannot guarantee idempotence.");
+ if (config.idempotenceEnabled()) {
+ if (!userConfiguredAcks)
+ log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
+ else if (acks != -1)
+ throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
+ "producer. Otherwise we cannot guarantee idempotence.");
}
return acks;
}
- private static int parseAcks(String acksString) {
- try {
- return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
- } catch (NumberFormatException e) {
- throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
- }
- }
-
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index fc1e6a7..75af5d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
@@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
+ private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig {
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
- return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+ Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+ maybeOverrideEnableIdempotence(refinedConfigs);
+ maybeOverrideClientId(refinedConfigs);
+ maybeOverrideAcksAndRetries(refinedConfigs);
+ return refinedConfigs;
+ }
+
+ private void maybeOverrideClientId(final Map<String, Object> configs) {
+ String refinedClientId;
+ boolean userConfiguredClientId = this.originals().containsKey(CLIENT_ID_CONFIG);
+ if (userConfiguredClientId) {
+ refinedClientId = this.getString(CLIENT_ID_CONFIG);
+ } else {
+ String transactionalId = this.getString(TRANSACTIONAL_ID_CONFIG);
+ refinedClientId = "producer-" + (transactionalId != null ? transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
+ }
+ configs.put(CLIENT_ID_CONFIG, refinedClientId);
+ }
+
+ private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
+ boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+
+ if (userConfiguredTransactions && !userConfiguredIdempotence) {
+ configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
+ }
+ }
+
+ private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
+ final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
+ configs.put(ACKS_CONFIG, acksStr);
+ // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
+ if (idempotenceEnabled()) {
+ boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
+ if (this.getInt(RETRIES_CONFIG) == 0) {
+ throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+ }
+ configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
+
+ boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
+ final short acks = Short.valueOf(acksStr);
+ if (userConfiguredAcks && acks != (short) -1) {
+ throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
+ "producer. Otherwise we cannot guarantee idempotence.");
+ }
+ configs.put(ACKS_CONFIG, "-1");
+ }
+ }
+
+ private static String parseAcks(String acksString) {
+ try {
+ return acksString.trim().equalsIgnoreCase("all") ? "-1" : Short.parseShort(acksString.trim()) + "";
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
+ }
}
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
@@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig {
super(CONFIG, props);
}
+ boolean idempotenceEnabled() {
+ boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+ boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+
+ if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
+ throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+ return userConfiguredTransactions || idempotenceEnabled;
+ }
+
ProducerConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3992a41..819770b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -106,12 +106,12 @@ public class AbstractConfig {
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
this.values = definition.parse(this.originals);
+ this.used = Collections.synchronizedSet(new HashSet<>());
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
this.values.put(update.getKey(), update.getValue());
}
definition.parse(this.values);
- this.used = Collections.synchronizedSet(new HashSet<>());
this.definition = definition;
if (doLog)
logAll();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index eee067f..92ae90b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -114,6 +114,22 @@ public class KafkaProducerTest {
Collections.emptySet());
@Test
+ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
+
+ ProducerConfig config = new ProducerConfig(props);
+ assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
+ assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
+ assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) == Integer.MAX_VALUE);
+ assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-" +
+ config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
+ }
+
+ @Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");