You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/04/18 08:14:51 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

yashmayya commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169647018


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id,
         this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.taskStopped = false;
         this.workerErrantRecordReporter = workerErrantRecordReporter;
+        Map<String, Object> adminProps = new HashMap<>();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, workerConfig.bootstrapServers());

Review Comment:
   Connect supports overriding Kafka client configs per connector - see [KIP-458](https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy). If we're using an admin client for a specific connector (or task), it should respect these overrides - the connector may even be targeting a Kafka cluster different from the Connect cluster's backing Kafka cluster and we cannot use worker configurations to create Kafka clients for connectors. It would probably be cleaner to instantiate this admin client in the `Worker` and pass it to the `WorkerSinkTask` during creation. An example from `WorkerSourceTask` creation that might help - https://github.com/apache/kafka/blob/454b72161a76b1687a1263157d7cc30a1bdb2506/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1424-L1446



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {

Review Comment:
   This seems kinda weird to me too - why would the consumer time out on trying to fetch the position for a topic partition that doesn't exist anymore? Is this expected / documented or just a poorly handled case in the consumer client?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   We don't commit offsets in `onPartitionsAssigned` so I'm not sure what the purpose of this log is?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {
+                    log.error("TimeoutException occurred when fetching position for topic partition {}. " +
+                            "Checking if the topic {} exists", tp, tp.topic());
+                    Map<String, TopicDescription> topic = topicAdmin.describeTopics(tp.topic());
+                    if (topic.isEmpty()) {
+                        log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   Same thoughts as above regarding the purpose of this log line.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {

Review Comment:
   I'm not sure I understand why the `onPartitionsAssigned` rebalance listener hook will be called for partitions of topics that are deleted? Is this expected behavior or a bug in the consumer client? If it's expected, is there any documentation around this? If it's a bug, I think it might be better to address the bug in the consumer client rather than introducing a workaround for it in Connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org