You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sameer W <sa...@axiomine.com> on 2016/08/23 10:50:27 UTC

Threading Model for Kinesis

Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer

Re: Threading Model for Kinesis

Posted by Sameer W <sa...@axiomine.com>.
Thanks Gordon - Appreciate the fast response.

Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>

Re: Threading Model for Kinesis

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi Sameer,

I realized you might be a bit confused between “source instances (which in
general are Flink tasks)” and “threads” in my previous explanations. The
per-broker threads in the Kafka consumer and per-shard threads in the
Kinesis consumer I mentioned are threads created by the source instance’s
main thread. So, they have nothing to do with the assignment of
shard/partitions to the source instances. The threading models previously
explained refers to how a single source instance consumes multiple
shards/partitions that are assigned to it.

Hope this clarifies things for you more :)

Regards,
Gordon


On August 23, 2016 at 9:31:58 PM, Tzu-Li (Gordon) Tai (tzulitai@gmail.com)
wrote:

Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sameer@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>

Re: Threading Model for Kinesis

Posted by Sameer W <sa...@axiomine.com>.
Perfect - This explains it very clearly. Thank you very much!

Sameer

On Tue, Aug 23, 2016 at 9:31 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Slight misunderstanding here. The one thread per Kafka broker happens
> *after* the assignment of Kafka partitions to the source instances. So,
> with a total of 10 partitions and 10 source instances, each source instance
> will first be assigned 1 partition. Then, each source instance will create
> 1 thread for every individual broker that holds partitions that the source
> instance is assigned. The per-broker threading model of the Kafka consumer
> has nothing to do with the initial assignment of partitions to source
> instances.
>
> Another example to explain this more clearly:
> Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
> parallelism 5. Each source instance will still have 2 partitions. If the
> 2 partitions belong to the same broker, the source instance will have only
> 1 consuming threads; otherwise if the 2 partitions belong to different
> brokers, the source instance will have 2 consuming threads.
>
> Regards,
> Gordon
>
>
> On August 23, 2016 at 8:47:15 PM, Sameer W (sameer@axiomine.com) wrote:
>
> Gordon,
>
> I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
> I have a parallelism of 10 defined for the job. I see all my 10
> source->Mapper->assignTimestamps receiving and sending data. If there is
> only one source instance per broker how does that happen?
>
> Thanks,
> Sameer
>
> On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
> wrote:
>
>> Hi!
>>
>> Kinesis shards should be ideally evenly assigned to the source instances.
>> So, with your example of source parallelism of 10 and 20 shards, each
>> source instance will have 2 shards and will have 2 threads consuming them
>> (therefore, not in round robin).
>>
>> For the Kafka consumer, in the source instances there will be one
>> consuming thread per broker, instead of partition. So, if a source instance
>> is assigned partitions that happen to be on the same broker, the source
>> instance will only create 1 thread to consume all of them.
>>
>> You are correct that currently the Kafka consumer does not handle
>> repartitioning transparently like the Kinesis connector, but we’re working
>> on this :)
>>
>> Regards,
>> Gordon
>>
>> On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:
>>
>> Hi,
>>
>> The documentation says that there will be one thread per shard. If I my
>> streaming job runs with a parallelism of 10 and there are 20 shards, are
>> more threads going to be launched within  a task slot running a source
>> function to consume the additional shards or will one source function
>> instance consume 2 shards in round robin.
>>
>> Is it any different for Kafka? Based on the documentation my
>> understanding is that if there are 10 source function instances and 20
>> partitions, each one will read 2 partitions.
>>
>> Also if partitions are added to Kafka are they handled by the existing
>> streaming job or does it need to be restarted? It appears as though Kinesis
>> handles it via the consumer constantly checking for more shards.
>>
>> Thanks,
>> Sameer
>>
>>
>

Re: Threading Model for Kinesis

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Slight misunderstanding here. The one thread per Kafka broker happens
*after* the assignment of Kafka partitions to the source instances. So,
with a total of 10 partitions and 10 source instances, each source instance
will first be assigned 1 partition. Then, each source instance will create
1 thread for every individual broker that holds partitions that the source
instance is assigned. The per-broker threading model of the Kafka consumer
has nothing to do with the initial assignment of partitions to source
instances.

Another example to explain this more clearly:
Say you have 2 Kafka brokers, and each hold 5 partitions, and have source
parallelism 5. Each source instance will still have 2 partitions. If the
2 partitions belong to the same broker, the source instance will have only
1 consuming threads; otherwise if the 2 partitions belong to different
brokers, the source instance will have 2 consuming threads.

Regards,
Gordon


On August 23, 2016 at 8:47:15 PM, Sameer W (sameer@axiomine.com) wrote:

Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>

Re: Threading Model for Kinesis

Posted by Sameer W <sa...@axiomine.com>.
Gordon,

I tried the following with Kafka - 1 Broker but a topic has 10 partitions.
I have a parallelism of 10 defined for the job. I see all my 10
source->Mapper->assignTimestamps receiving and sending data. If there is
only one source instance per broker how does that happen?

Thanks,
Sameer

On Tue, Aug 23, 2016 at 7:17 AM, Tzu-Li (Gordon) Tai <tz...@gmail.com>
wrote:

> Hi!
>
> Kinesis shards should be ideally evenly assigned to the source instances.
> So, with your example of source parallelism of 10 and 20 shards, each
> source instance will have 2 shards and will have 2 threads consuming them
> (therefore, not in round robin).
>
> For the Kafka consumer, in the source instances there will be one
> consuming thread per broker, instead of partition. So, if a source instance
> is assigned partitions that happen to be on the same broker, the source
> instance will only create 1 thread to consume all of them.
>
> You are correct that currently the Kafka consumer does not handle
> repartitioning transparently like the Kinesis connector, but we’re working
> on this :)
>
> Regards,
> Gordon
>
> On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:
>
> Hi,
>
> The documentation says that there will be one thread per shard. If I my
> streaming job runs with a parallelism of 10 and there are 20 shards, are
> more threads going to be launched within  a task slot running a source
> function to consume the additional shards or will one source function
> instance consume 2 shards in round robin.
>
> Is it any different for Kafka? Based on the documentation my understanding
> is that if there are 10 source function instances and 20 partitions, each
> one will read 2 partitions.
>
> Also if partitions are added to Kafka are they handled by the existing
> streaming job or does it need to be restarted? It appears as though Kinesis
> handles it via the consumer constantly checking for more shards.
>
> Thanks,
> Sameer
>
>

Re: Threading Model for Kinesis

Posted by "Tzu-Li (Gordon) Tai" <tz...@gmail.com>.
Hi!

Kinesis shards should be ideally evenly assigned to the source instances.
So, with your example of source parallelism of 10 and 20 shards, each
source instance will have 2 shards and will have 2 threads consuming them
(therefore, not in round robin).

For the Kafka consumer, in the source instances there will be one consuming
thread per broker, instead of partition. So, if a source instance is
assigned partitions that happen to be on the same broker, the source
instance will only create 1 thread to consume all of them.

You are correct that currently the Kafka consumer does not handle
repartitioning transparently like the Kinesis connector, but we’re working
on this :)

Regards,
Gordon

On August 23, 2016 at 6:50:31 PM, Sameer W (sameer@axiomine.com) wrote:

Hi,

The documentation says that there will be one thread per shard. If I my
streaming job runs with a parallelism of 10 and there are 20 shards, are
more threads going to be launched within  a task slot running a source
function to consume the additional shards or will one source function
instance consume 2 shards in round robin.

Is it any different for Kafka? Based on the documentation my understanding
is that if there are 10 source function instances and 20 partitions, each
one will read 2 partitions.

Also if partitions are added to Kafka are they handled by the existing
streaming job or does it need to be restarted? It appears as though Kinesis
handles it via the consumer constantly checking for more shards.

Thanks,
Sameer