You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Michael Vodep <mi...@bearingpoint.com> on 2023/03/02 16:13:52 UTC

Problems with Kafka Connector Base classes

Hi

I'm using:  org.apache.kafka:connect-api:3.4.0

I have a simple connector:

======
public class SimpleSinkConnector extends SinkConnector {

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<>();

        for (int i = 0; i < maxTasks; i++) {
            configs.add(new HashMap<>());
        }

        return configs;
    }
======

I don't have config params (POC). So, for testing purposes I just returned the ArrayList. It took me now 2 hours to figure out why no Task was created. At the end of the day: I added the for loop - then it worked. Reproduced the behavior several times.

Can anybody conform this strange behavior? I would expect an exception if something is expected ...

Thanks
Michael
________________________________
BearingPoint GmbH
Sitz: Wien
Firmenbuchgericht: Handelsgericht Wien
Firmenbuchnummer: FN 175524z

The information in this email is confidential and may be legally privileged. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.

Re: Problems with Kafka Connector Base classes

Posted by Chris Egerton <ch...@aiven.io.INVALID>.
Hi Michael,

Please feel free to file a PR to improve the docs, we welcome contributions!

As far as a new status in the REST API goes, this would be a bit more
involved and require a KIP (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals),
but if you believe it would be useful, you're welcome to pursue that option
as well.

Cheers,

Chris

On Fri, Mar 3, 2023 at 1:20 AM Michael Vodep <mi...@bearingpoint.com>
wrote:

> Hi
>
> Thanks a lot - it's clear now.
>
> Maybe the behavior of what happens if
>
> * no config is returned
> * more config > maxTasks is returned
>
> Can be added to API docu 😉
>
> http://localhost:8083/connectors?expand=info&expand=status
>
> If here the task factory also reflects some "WAITING_FOR_TASK_CONFIG" -
> that would be awesome 😉
>
> Thanks
> Michael
>
> -----Ursprüngliche Nachricht-----
> Von: Chris Egerton <ch...@aiven.io.INVALID>
> Gesendet: Donnerstag, 2. März 2023 17:33
> An: dev@kafka.apache.org
> Betreff: Re: Problems with Kafka Connector Base classes
>
> [You don't often get email from chrise@aiven.io.invalid. Learn why this
> is important at https://aka.ms/LearnAboutSenderIdentification ]
>
> Hi Michael,
>
> The purpose of the Connector::taskConfigs method is to specify
> (implicitly) the number of tasks that need to be run for the connector, and
> (explicitly) how each task should be configured. The former is provided
> implicitly by the size of the list returned from that method; for each
> element in that list, one task will be brought up for the connector,
> regardless of whether the map for that element is actually just empty.
>
> The reason your for-loop was necessary is that, although we do distinguish
> between an empty list of task configs and a non-empty list, we do not
> distinguish on a per-element basis between an empty task config (i.e., the
> Map<String, String> inside the task configs list) and a non-empty task
> config. So, by adding the for-loop, you signaled to the runtime that you
> wanted the number of tasks brought up for your connector to be `maxTasks`.
>
> We choose to allow connectors to return an empty list of task configs to
> facilitate the use case of Connector implementations that may decide not to
> generate any tasks, which could be useful if, for example, there are no
> tables that match a given allow/block list to be read from in the database
> at the moment. This is also why we do not fail connectors when they
> generate an empty set of task configs (or throw an exception in some API
> call that the connector may make of the runtime).
>
> I hope this helps clear things up for you.
>
> Cheers,
>
> Chris
>
> On Thu, Mar 2, 2023 at 11:14 AM Michael Vodep <
> michael.vodep@bearingpoint.com> wrote:
>
> > Hi
> >
> > I'm using:  org.apache.kafka:connect-api:3.4.0
> >
> > I have a simple connector:
> >
> > ======
> > public class SimpleSinkConnector extends SinkConnector {
> >
> >     @Override
> >     public List<Map<String, String>> taskConfigs(int maxTasks) {
> >         ArrayList<Map<String, String>> configs = new ArrayList<>();
> >
> >         for (int i = 0; i < maxTasks; i++) {
> >             configs.add(new HashMap<>());
> >         }
> >
> >         return configs;
> >     }
> > ======
> >
> > I don't have config params (POC). So, for testing purposes I just
> > returned the ArrayList. It took me now 2 hours to figure out why no
> > Task was created. At the end of the day: I added the for loop - then it
> worked.
> > Reproduced the behavior several times.
> >
> > Can anybody conform this strange behavior? I would expect an exception
> > if something is expected ...
> >
> > Thanks
> > Michael
> > ________________________________
> > BearingPoint GmbH
> > Sitz: Wien
> > Firmenbuchgericht: Handelsgericht Wien
> > Firmenbuchnummer: FN 175524z
> >
> > The information in this email is confidential and may be legally
> > privileged. If you are not the intended recipient of this message, any
> > review, disclosure, copying, distribution, retention, or any action
> > taken or omitted to be taken in reliance on it is prohibited and may be
> unlawful.
> > If you are not the intended recipient, please reply to or forward a
> > copy of this message to the sender and delete the message, any
> > attachments, and any copies thereof from your system.
> >
> ________________________________
>  BearingPoint GmbH
> Sitz: Wien
> Firmenbuchgericht: Handelsgericht Wien
> Firmenbuchnummer: FN 175524z
>
>
> The information in this email is confidential and may be legally
> privileged. If you are not the intended recipient of this message, any
> review, disclosure, copying, distribution, retention, or any action taken
> or omitted to be taken in reliance on it is prohibited and may be unlawful.
> If you are not the intended recipient, please reply to or forward a copy of
> this message to the sender and delete the message, any attachments, and any
> copies thereof from your system.
>

AW: Problems with Kafka Connector Base classes

Posted by Michael Vodep <mi...@bearingpoint.com>.
Hi

Thanks a lot - it's clear now.

Maybe the behavior of what happens if

* no config is returned
* more config > maxTasks is returned

Can be added to API docu 😉

http://localhost:8083/connectors?expand=info&expand=status

If here the task factory also reflects some "WAITING_FOR_TASK_CONFIG" - that would be awesome 😉

Thanks
Michael

-----Ursprüngliche Nachricht-----
Von: Chris Egerton <ch...@aiven.io.INVALID>
Gesendet: Donnerstag, 2. März 2023 17:33
An: dev@kafka.apache.org
Betreff: Re: Problems with Kafka Connector Base classes

[You don't often get email from chrise@aiven.io.invalid. Learn why this is important at https://aka.ms/LearnAboutSenderIdentification ]

Hi Michael,

The purpose of the Connector::taskConfigs method is to specify (implicitly) the number of tasks that need to be run for the connector, and (explicitly) how each task should be configured. The former is provided implicitly by the size of the list returned from that method; for each element in that list, one task will be brought up for the connector, regardless of whether the map for that element is actually just empty.

The reason your for-loop was necessary is that, although we do distinguish between an empty list of task configs and a non-empty list, we do not distinguish on a per-element basis between an empty task config (i.e., the Map<String, String> inside the task configs list) and a non-empty task config. So, by adding the for-loop, you signaled to the runtime that you wanted the number of tasks brought up for your connector to be `maxTasks`.

We choose to allow connectors to return an empty list of task configs to facilitate the use case of Connector implementations that may decide not to generate any tasks, which could be useful if, for example, there are no tables that match a given allow/block list to be read from in the database at the moment. This is also why we do not fail connectors when they generate an empty set of task configs (or throw an exception in some API call that the connector may make of the runtime).

I hope this helps clear things up for you.

Cheers,

Chris

On Thu, Mar 2, 2023 at 11:14 AM Michael Vodep < michael.vodep@bearingpoint.com> wrote:

> Hi
>
> I'm using:  org.apache.kafka:connect-api:3.4.0
>
> I have a simple connector:
>
> ======
> public class SimpleSinkConnector extends SinkConnector {
>
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         ArrayList<Map<String, String>> configs = new ArrayList<>();
>
>         for (int i = 0; i < maxTasks; i++) {
>             configs.add(new HashMap<>());
>         }
>
>         return configs;
>     }
> ======
>
> I don't have config params (POC). So, for testing purposes I just
> returned the ArrayList. It took me now 2 hours to figure out why no
> Task was created. At the end of the day: I added the for loop - then it worked.
> Reproduced the behavior several times.
>
> Can anybody conform this strange behavior? I would expect an exception
> if something is expected ...
>
> Thanks
> Michael
> ________________________________
> BearingPoint GmbH
> Sitz: Wien
> Firmenbuchgericht: Handelsgericht Wien
> Firmenbuchnummer: FN 175524z
>
> The information in this email is confidential and may be legally
> privileged. If you are not the intended recipient of this message, any
> review, disclosure, copying, distribution, retention, or any action
> taken or omitted to be taken in reliance on it is prohibited and may be unlawful.
> If you are not the intended recipient, please reply to or forward a
> copy of this message to the sender and delete the message, any
> attachments, and any copies thereof from your system.
>
________________________________
 BearingPoint GmbH
Sitz: Wien
Firmenbuchgericht: Handelsgericht Wien
Firmenbuchnummer: FN 175524z


The information in this email is confidential and may be legally privileged. If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful. If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.

Re: Problems with Kafka Connector Base classes

Posted by Chris Egerton <ch...@aiven.io.INVALID>.
Hi Michael,

The purpose of the Connector::taskConfigs method is to specify (implicitly)
the number of tasks that need to be run for the connector, and (explicitly)
how each task should be configured. The former is provided implicitly by
the size of the list returned from that method; for each element in that
list, one task will be brought up for the connector, regardless of whether
the map for that element is actually just empty.

The reason your for-loop was necessary is that, although we do distinguish
between an empty list of task configs and a non-empty list, we do not
distinguish on a per-element basis between an empty task config (i.e., the
Map<String, String> inside the task configs list) and a non-empty task
config. So, by adding the for-loop, you signaled to the runtime that you
wanted the number of tasks brought up for your connector to be `maxTasks`.

We choose to allow connectors to return an empty list of task configs to
facilitate the use case of Connector implementations that may decide not to
generate any tasks, which could be useful if, for example, there are no
tables that match a given allow/block list to be read from in the database
at the moment. This is also why we do not fail connectors when they
generate an empty set of task configs (or throw an exception in some API
call that the connector may make of the runtime).

I hope this helps clear things up for you.

Cheers,

Chris

On Thu, Mar 2, 2023 at 11:14 AM Michael Vodep <
michael.vodep@bearingpoint.com> wrote:

> Hi
>
> I'm using:  org.apache.kafka:connect-api:3.4.0
>
> I have a simple connector:
>
> ======
> public class SimpleSinkConnector extends SinkConnector {
>
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         ArrayList<Map<String, String>> configs = new ArrayList<>();
>
>         for (int i = 0; i < maxTasks; i++) {
>             configs.add(new HashMap<>());
>         }
>
>         return configs;
>     }
> ======
>
> I don't have config params (POC). So, for testing purposes I just returned
> the ArrayList. It took me now 2 hours to figure out why no Task was
> created. At the end of the day: I added the for loop - then it worked.
> Reproduced the behavior several times.
>
> Can anybody conform this strange behavior? I would expect an exception if
> something is expected ...
>
> Thanks
> Michael
> ________________________________
> BearingPoint GmbH
> Sitz: Wien
> Firmenbuchgericht: Handelsgericht Wien
> Firmenbuchnummer: FN 175524z
>
> The information in this email is confidential and may be legally
> privileged. If you are not the intended recipient of this message, any
> review, disclosure, copying, distribution, retention, or any action taken
> or omitted to be taken in reliance on it is prohibited and may be unlawful.
> If you are not the intended recipient, please reply to or forward a copy of
> this message to the sender and delete the message, any attachments, and any
> copies thereof from your system.
>