You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2023/04/10 07:33:00 UTC

[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

    [ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710077#comment-17710077 ] 

Sagar Rao commented on KAFKA-14750:
-----------------------------------

I ran a few more tests for this. This issue is found specifically when a mass delete of topics is issued. 

Typically, if a few topics is deleted ( I tested with topics ranging from 1-10), the `position` API doesn't fail. It says the following 

```

[2023-04-10 12:19:33,115] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Resetting offset for partition connect-test-3010-0 to position FetchPosition\{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:399)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-300-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3002-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3003-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3004-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3005-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3006-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3007-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3008-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3009-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3010-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,660] WARN [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Received unknown topic or partition error in fetch for partition connect-test-3002-0 (org.apache.kafka.clients.consumer.internals.Fetcher:1340)

[2023-04-10 12:19:33,661] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Request joining group due to: cached metadata has changed from (version362: \{connect-test-3007=1, connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, connect-test-300=1, connect-test-3004=1, connect-test-3005=1, connect-test-3006=1, connect-test-3010=1, connect-test-3002=1}) at the beginning of the rebalance to (version363: \{connect-test-3007=1, connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, connect-test-300=1, connect-test-3004=1, connect-test-3005=1, connect-test-3006=1, connect-test-3010=1}) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1066)

[2023-04-10 12:19:34,165] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Revoke previously assigned partitions connect-test-300-0, connect-test-3002-0, connect-test-3003-0, connect-test-3004-0, connect-test-3005-0, connect-test-3006-0, connect-test-3007-0, connect-test-3008-0, connect-test-3009-0, connect-test-3010-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:331)

```

 

Notice the second `Request joining group due to: cached metadata has changed from` after 
Received unknown topic or partition error in fetch for partition
In the sort of test that was executed above, the line `
Received unknown topic or partition error in fetch for partition
is not followed by the cached metadata has changed from.

 

The `OffsetFetcher`, if it can't find the Topic Or Partition, throws an `UNKNOWN_TOPIC_OR_PARTITION` and sets up the TP for retry. 

```

case UNKNOWN_TOPIC_OR_PARTITION:
log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
partitionsToRetry.add(topicPartition);
break;

```

Eventually, if it can't resolve the TP, the ConsumerCoordinator would set the error in `onJoinComplete` and then fail everything, as we see happening here.

One way to get around this (which I have tested locally as well) is to pass along an Admin client which checks the list of topics that exists and invoke `postition` only for the ones that exist. However, I am not too keen on proceeding with that solution for 2 reasons =>

1) This error doesn't happen always and instead is found under extreme scenarios (i.e mass delete of topics). Passing an adminClient and checking for the existence of topics every time is too much of an overhead for corner cases IMO. Although [~morozov] , if you think my statement doesn't make sense, I would definitely like to know your thoughts.

2) Eventually, the ConsumerCoordinator machinery also prefers failing the Consumer for TimeoutExceptions. I would not like to poke too much around it as I would like to believe the behaviour we see is the right one.

 

[~ChrisEgerton] , would like to hear your thoughts on this one.

 

> Sink connector fails if a topic matching its topics.regex gets deleted
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14750
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14750
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.1
>            Reporter: Sergei Morozov
>            Assignee: Sagar Rao
>            Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition connect-test-211-0 could be determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)