You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sergei Morozov (Jira)" <ji...@apache.org> on 2023/04/12 22:47:00 UTC

[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

    [ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711589#comment-17711589 ] 

Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:46 PM:
------------------------------------------------------------------

{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry?
{quote}
I'd say we throw the original exception.


was (Author: morozov):
{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade), so we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry?
{quote}
I'd say we throw the original exception.

> Sink connector fails if a topic matching its topics.regex gets deleted
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14750
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14750
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.1
>            Reporter: Sergei Morozov
>            Assignee: Sagar Rao
>            Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition connect-test-211-0 could be determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)