You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2022/03/23 00:27:59 UTC

[kafka] branch 3.2 updated: Revert "KAFKA-7077: Use default producer settings in Connect Worker (#11475)" (#11932)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 802e285  Revert "KAFKA-7077: Use default producer settings in Connect Worker (#11475)" (#11932)
802e285 is described below

commit 802e285e0737bca0a0c6823480ef3681b3da0edd
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Tue Mar 22 17:19:29 2022 -0700

    Revert "KAFKA-7077: Use default producer settings in Connect Worker (#11475)" (#11932)
    
    This reverts commit 76cf7a5793702b55e2cfd98a375f8f1708ff32c3.
    
    Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by defaul [...]
---
 .../runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java  | 2 ++
 .../src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java      | 2 ++
 2 files changed, 4 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 45c7d9f..582271a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -648,6 +648,8 @@ public class Worker {
         // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
         // but this may compromise the delivery guarantees of Kafka Connect.
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
         // User-specified overrides
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e57e5e3..2b21079 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -213,6 +213,8 @@ public class WorkerTest extends ThreadedTest {
         defaultProducerConfigs.put(
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
+        defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
         defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
 
         defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");