You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/06/07 22:50:44 UTC
[kafka] branch trunk updated: KAFKA-7002: Add a config property for
DLQ topic's replication factor (KIP-298)
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22612be KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)
22612be is described below
commit 22612be46dd68255642ca50a1f78fa1790eb4759
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Thu Jun 7 15:49:57 2018 -0700
KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)
Currently, the replication factor is hardcoded to a value of 3. This means that we cannot use a DLQ in any cluster setup with less than three brokers. It is better to have the user specify this value if the default value does meet the requirements.
Testing: A unit test is added.
Signed-off-by: Arjun Satish <arjunconfluent.io>
Author: Arjun Satish <ar...@confluent.io>
Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5145 from wicknicks/KAFKA-7002
---
.../apache/kafka/connect/runtime/SinkConnectorConfig.java | 12 +++++++++++-
.../connect/runtime/errors/DeadLetterQueueReporter.java | 9 ++++-----
.../kafka/connect/runtime/errors/ErrorReporterTest.java | 9 +++++++++
3 files changed, 24 insertions(+), 6 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 9629f8f..6e9bd6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -52,10 +52,16 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final String DLQ_TOPIC_DEFAULT = "";
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
+ public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
+ private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
+ public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
+ private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
+
static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
- .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY);
+ .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+ .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
public static ConfigDef configDef() {
return config;
@@ -97,4 +103,8 @@ public class SinkConnectorConfig extends ConnectorConfig {
public String dlqTopicName() {
return getString(DLQ_TOPIC_NAME_CONFIG);
}
+
+ public short dlqTopicReplicationFactor() {
+ return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 9a8a9af..459eeae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -44,7 +44,6 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
- private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
private final SinkConnectorConfig connConfig;
@@ -53,13 +52,13 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private ErrorHandlingMetrics errorHandlingMetrics;
public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
- SinkConnectorConfig connConfig, Map<String, Object> producerProps) {
- String topic = connConfig.dlqTopicName();
+ SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) {
+ String topic = sinkConfig.dlqTopicName();
try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
- NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR);
+ NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
admin.createTopics(singleton(schemaTopicRequest)).all().get();
}
} catch (InterruptedException e) {
@@ -71,7 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
}
KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
- return new DeadLetterQueueReporter(dlqProducer, connConfig);
+ return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
}
/**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index b5410d0..f35c514 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -180,6 +180,15 @@ public class ErrorReporterTest {
"partition=5, offset=100}.", msg);
}
+ @Test
+ public void testSetDLQConfigs() {
+ SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
+ assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);
+
+ configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7"));
+ assertEquals(configuration.dlqTopicReplicationFactor(), 7);
+ }
+
private ProcessingContext processingContext() {
ProcessingContext context = new ProcessingContext();
context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.