You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/10/15 19:20:30 UTC
[kafka] 01/02: KAFKA-10340: Improve trace logging under connector
based topic creation (#9149)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3bd478cc29dafcfdb3d30390e603db412f9726e0
Author: Luke Chen <43...@users.noreply.github.com>
AuthorDate: Fri Oct 16 02:14:54 2020 +0800
KAFKA-10340: Improve trace logging under connector based topic creation (#9149)
Reviewers: Konstantine Karantasis <k....@gmail.com>
---
.../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 1febd7f..180a6bb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -344,7 +344,7 @@ class WorkerSourceTask extends WorkerTask {
continue;
}
- log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value());
+ log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value());
// We need this queued first since the callback could happen immediately (even synchronously in some cases).
// Because of this we need to be careful about handling retries -- we always save the previously attempted
// record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding
@@ -409,6 +409,9 @@ class WorkerSourceTask extends WorkerTask {
// RegexRouter) topic creation can not be batched for multiple topics
private void maybeCreateTopic(String topic) {
if (!topicCreation.isTopicCreationRequired(topic)) {
+ log.trace("Topic creation by the connector is disabled or the topic {} was previously created." +
+ "If auto.create.topics.enable is enabled on the broker, " +
+ "the topic will be created with default settings", topic);
return;
}
log.info("The task will send records to topic '{}' for the first time. Checking "
@@ -430,7 +433,7 @@ class WorkerSourceTask extends WorkerTask {
log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
} else {
log.warn("Request to create new topic '{}' failed", topic);
- throw new ConnectException("Task failed to create new topic " + topic + ". Ensure "
+ throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure "
+ "that the task is authorized to create topics or that the topic exists and "
+ "restart the task");
}