You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/07 22:50:44 UTC

[kafka] branch trunk updated: KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22612be  KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)
22612be is described below

commit 22612be46dd68255642ca50a1f78fa1790eb4759
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Thu Jun 7 15:49:57 2018 -0700

    KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)
    
    Currently, the replication factor is hardcoded to a value of 3. This means that we cannot use a DLQ in any cluster setup with less than three brokers. It is better to have the user specify this value if the default value does meet the requirements.
    
    Testing: A unit test is added.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    Author: Arjun Satish <ar...@confluent.io>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5145 from wicknicks/KAFKA-7002
---
 .../apache/kafka/connect/runtime/SinkConnectorConfig.java    | 12 +++++++++++-
 .../connect/runtime/errors/DeadLetterQueueReporter.java      |  9 ++++-----
 .../kafka/connect/runtime/errors/ErrorReporterTest.java      |  9 +++++++++
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 9629f8f..6e9bd6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -52,10 +52,16 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public static final String DLQ_TOPIC_DEFAULT = "";
     private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
 
+    public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
+    private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
+    public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
+    private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
+
     static ConfigDef config = ConnectorConfig.configDef()
         .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
         .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
-        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY);
+        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
 
     public static ConfigDef configDef() {
         return config;
@@ -97,4 +103,8 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public String dlqTopicName() {
         return getString(DLQ_TOPIC_NAME_CONFIG);
     }
+
+    public short dlqTopicReplicationFactor() {
+        return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 9a8a9af..459eeae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -44,7 +44,6 @@ public class DeadLetterQueueReporter implements ErrorReporter {
 
     private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
 
-    private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
     private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
 
     private final SinkConnectorConfig connConfig;
@@ -53,13 +52,13 @@ public class DeadLetterQueueReporter implements ErrorReporter {
     private ErrorHandlingMetrics errorHandlingMetrics;
 
     public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
-                                                         SinkConnectorConfig connConfig, Map<String, Object> producerProps) {
-        String topic = connConfig.dlqTopicName();
+                                                         SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) {
+        String topic = sinkConfig.dlqTopicName();
 
         try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
             if (!admin.listTopics().names().get().contains(topic)) {
                 log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
-                NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR);
+                NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
                 admin.createTopics(singleton(schemaTopicRequest)).all().get();
             }
         } catch (InterruptedException e) {
@@ -71,7 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
         }
 
         KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
-        return new DeadLetterQueueReporter(dlqProducer, connConfig);
+        return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index b5410d0..f35c514 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -180,6 +180,15 @@ public class ErrorReporterTest {
                 "partition=5, offset=100}.", msg);
     }
 
+    @Test
+    public void testSetDLQConfigs() {
+        SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
+        assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);
+
+        configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7"));
+        assertEquals(configuration.dlqTopicReplicationFactor(), 7);
+    }
+
     private ProcessingContext processingContext() {
         ProcessingContext context = new ProcessingContext();
         context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.