You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by pravin kumar <pk...@gmail.com> on 2017/10/24 09:14:42 UTC

regarding number of Stream Tasks

I have created a stream with topic contains 5 partitions and expected to
create 5 stream tasks ,i got 10 tasks as
0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4


my doubt is:im expected to have 5 tasks how it produced 10 tasks

here are some logs:
            [2017-10-24 10:27:35,284] INFO Kafka commitId :
cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-24 10:27:35,284] DEBUG Kafka consumer created
(org.apache.kafka.clients.consumer.KafkaConsumer)
[2017-10-24 10:27:35,304] INFO stream-thread
[SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d-StreamThread-1]
State transition from CREATED to RUNNING.
(org.apache.kafka.streams.processor.internals.StreamThread)
[2017-10-24 10:27:35,306] DEBUG stream-client
[SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d]
Removing local Kafka Streams application data in
/home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/SingleConsumerMultiConsumerUsingStreamx4
for application SingleConsumerMultiConsumerUsingStreamx4.
(org.apache.kafka.streams.KafkaStreams)
[2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,399] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_4
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_4 for task 0_4 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_4
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_4
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_4 for task 1_4 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,401] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_4
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,401] DEBUG stream-client
[SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d]
Starting Kafka Stream process. (org.apache.kafka.streams.KafkaStreams)
[2017-10-24 10:27:35,413] DEBUG Updated cluster metadata version 1 to
Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions
= []) (org.apache.kafka.clients.Metadata)

Re: regarding number of Stream Tasks

Posted by pravin kumar <pk...@gmail.com>.
ohhh...thank you. Its cleared now

On Tue, Oct 31, 2017 at 4:36 PM, Damian Guy <da...@gmail.com> wrote:

> Hi, the `map` when it is followed by `groupByKey` will cause a
> repartitioning of the data, so you will have your 5 tasks processing the
> input partitions and 5 tasks processing the partitions from the
> repartitioning.
>
> On Tue, 31 Oct 2017 at 10:56 pravin kumar <pk...@gmail.com> wrote:
>
> >  I have created a stream with topic contains 5 partitions and expected to
> > create 5 stream tasks ,i got 10 tasks as
> > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> >
> >
> > im doing wordcount in this example,
> >
> > here is my topology in this link: 1.
> > https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725
> > 2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6.
> >
> > On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > It would depend on what your topology looks like, which you haven't
> show
> > > here. But if there may be internal topics generated due to
> repartitioning
> > > which would cause the extra tasks.
> > > If you provide the topology we would be able to tell you.
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk...@gmail.com> wrote:
> > >
> > > > I have created a stream with topic contains 5 partitions and expected
> > to
> > > > create 5 stream tasks ,i got 10 tasks as
> > > > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> > > >
> > > >
> > > > my doubt is:im expected to have 5 tasks how it produced 10 tasks
> > > >
> > > > here are some logs:
> > > >             [2017-10-24 10:27:35,284] INFO Kafka commitId :
> > > > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> > > > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> > > > (org.apache.kafka.clients.consumer.KafkaConsumer)
> > > > [2017-10-24 10:27:35,304] INFO stream-thread
> > > >
> > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > > 9d8f-a1a9a8adfb7d-StreamThread-1]
> > > > State transition from CREATED to RUNNING.
> > > > (org.apache.kafka.streams.processor.internals.StreamThread)
> > > > [2017-10-24 10:27:35,306] DEBUG stream-client
> > > >
> > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > > 9d8f-a1a9a8adfb7d]
> > > > Removing local Kafka Streams application data in
> > > >
> > > > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/
> > > SingleConsumerMultiConsumerUsingStreamx4
> > > > for application SingleConsumerMultiConsumerUsingStreamx4.
> > > > (org.apache.kafka.streams.KafkaStreams)
> > > > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_1
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_1
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_1
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_1
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_2
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_2
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_2
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_2
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_3
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_3
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_3
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,399] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_3
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_4
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_4 for task 0_4 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_4
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_4
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_4 for task 1_4 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,401] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_4
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,401] DEBUG stream-client
> > > >
> > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > > 9d8f-a1a9a8adfb7d]
> > > > Starting Kafka Stream process. (org.apache.kafka.streams.
> KafkaStreams)
> > > > [2017-10-24 10:27:35,413] DEBUG Updated cluster metadata version 1 to
> > > > Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)],
> > > partitions
> > > > = []) (org.apache.kafka.clients.Metadata)
> > > >
> > >
> >
>

Re: regarding number of Stream Tasks

Posted by Damian Guy <da...@gmail.com>.
Hi, the `map` when it is followed by `groupByKey` will cause a
repartitioning of the data, so you will have your 5 tasks processing the
input partitions and 5 tasks processing the partitions from the
repartitioning.

On Tue, 31 Oct 2017 at 10:56 pravin kumar <pk...@gmail.com> wrote:

>  I have created a stream with topic contains 5 partitions and expected to
> create 5 stream tasks ,i got 10 tasks as
> 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
>
>
> im doing wordcount in this example,
>
> here is my topology in this link: 1.
> https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725
> 2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6.
>
> On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy <da...@gmail.com> wrote:
>
> > It would depend on what your topology looks like, which you haven't show
> > here. But if there may be internal topics generated due to repartitioning
> > which would cause the extra tasks.
> > If you provide the topology we would be able to tell you.
> > Thanks,
> > Damian
> >
> > On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk...@gmail.com> wrote:
> >
> > > I have created a stream with topic contains 5 partitions and expected
> to
> > > create 5 stream tasks ,i got 10 tasks as
> > > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> > >
> > >
> > > my doubt is:im expected to have 5 tasks how it produced 10 tasks
> > >
> > > here are some logs:
> > >             [2017-10-24 10:27:35,284] INFO Kafka commitId :
> > > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> > > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> > > (org.apache.kafka.clients.consumer.KafkaConsumer)
> > > [2017-10-24 10:27:35,304] INFO stream-thread
> > >
> > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > 9d8f-a1a9a8adfb7d-StreamThread-1]
> > > State transition from CREATED to RUNNING.
> > > (org.apache.kafka.streams.processor.internals.StreamThread)
> > > [2017-10-24 10:27:35,306] DEBUG stream-client
> > >
> > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > 9d8f-a1a9a8adfb7d]
> > > Removing local Kafka Streams application data in
> > >
> > > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/
> > SingleConsumerMultiConsumerUsingStreamx4
> > > for application SingleConsumerMultiConsumerUsingStreamx4.
> > > (org.apache.kafka.streams.KafkaStreams)
> > > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 0_0
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 0_0
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 1_0
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 1_0
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 0_1
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 0_1
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 1_1
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 1_1
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 0_2
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 0_2
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 1_2
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 1_2
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 0_3
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 0_3
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 1_3
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,399] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 1_3
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 0_4
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 0_4 for task 0_4 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 0_4
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state
> > dir
> > > lock for task 1_4
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting
> obsolete
> > > state directory 1_4 for task 1_4 as cleanup delay of 0 ms has passed
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,401] DEBUG stream-thread [cleanup] Released state
> > dir
> > > lock for task 1_4
> > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > [2017-10-24 10:27:35,401] DEBUG stream-client
> > >
> > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > 9d8f-a1a9a8adfb7d]
> > > Starting Kafka Stream process. (org.apache.kafka.streams.KafkaStreams)
> > > [2017-10-24 10:27:35,413] DEBUG Updated cluster metadata version 1 to
> > > Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)],
> > partitions
> > > = []) (org.apache.kafka.clients.Metadata)
> > >
> >
>

Re: regarding number of Stream Tasks

Posted by pravin kumar <pk...@gmail.com>.
 I have created a stream with topic contains 5 partitions and expected to
create 5 stream tasks ,i got 10 tasks as
0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4


im doing wordcount in this example,

here is my topology in this link: 1.
https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725
2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6.

On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy <da...@gmail.com> wrote:

> It would depend on what your topology looks like, which you haven't show
> here. But if there may be internal topics generated due to repartitioning
> which would cause the extra tasks.
> If you provide the topology we would be able to tell you.
> Thanks,
> Damian
>
> On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk...@gmail.com> wrote:
>
> > I have created a stream with topic contains 5 partitions and expected to
> > create 5 stream tasks ,i got 10 tasks as
> > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> >
> >
> > my doubt is:im expected to have 5 tasks how it produced 10 tasks
> >
> > here are some logs:
> >             [2017-10-24 10:27:35,284] INFO Kafka commitId :
> > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> > (org.apache.kafka.clients.consumer.KafkaConsumer)
> > [2017-10-24 10:27:35,304] INFO stream-thread
> >
> > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> 9d8f-a1a9a8adfb7d-StreamThread-1]
> > State transition from CREATED to RUNNING.
> > (org.apache.kafka.streams.processor.internals.StreamThread)
> > [2017-10-24 10:27:35,306] DEBUG stream-client
> >
> > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> 9d8f-a1a9a8adfb7d]
> > Removing local Kafka Streams application data in
> >
> > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/
> SingleConsumerMultiConsumerUsingStreamx4
> > for application SingleConsumerMultiConsumerUsingStreamx4.
> > (org.apache.kafka.streams.KafkaStreams)
> > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_2
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_2
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_2
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_2
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_3
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_3
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_3
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,399] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_3
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_4
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_4 for task 0_4 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_4
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_4
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_4 for task 1_4 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,401] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_4
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,401] DEBUG stream-client
> >
> > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> 9d8f-a1a9a8adfb7d]
> > Starting Kafka Stream process. (org.apache.kafka.streams.KafkaStreams)
> > [2017-10-24 10:27:35,413] DEBUG Updated cluster metadata version 1 to
> > Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)],
> partitions
> > = []) (org.apache.kafka.clients.Metadata)
> >
>

Re: regarding number of Stream Tasks

Posted by Damian Guy <da...@gmail.com>.
It would depend on what your topology looks like, which you haven't show
here. But if there may be internal topics generated due to repartitioning
which would cause the extra tasks.
If you provide the topology we would be able to tell you.
Thanks,
Damian

On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk...@gmail.com> wrote:

> I have created a stream with topic contains 5 partitions and expected to
> create 5 stream tasks ,i got 10 tasks as
> 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
>
>
> my doubt is:im expected to have 5 tasks how it produced 10 tasks
>
> here are some logs:
>             [2017-10-24 10:27:35,284] INFO Kafka commitId :
> cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2017-10-24 10:27:35,304] INFO stream-thread
>
> [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d-StreamThread-1]
> State transition from CREATED to RUNNING.
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2017-10-24 10:27:35,306] DEBUG stream-client
>
> [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d]
> Removing local Kafka Streams application data in
>
> /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/SingleConsumerMultiConsumerUsingStreamx4
> for application SingleConsumerMultiConsumerUsingStreamx4.
> (org.apache.kafka.streams.KafkaStreams)
> [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 0_0
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete
> state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state dir
> lock for task 0_0
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 1_0
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete
> state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
> lock for task 1_0
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 0_1
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
> lock for task 0_1
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 1_1
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state dir
> lock for task 1_1
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 0_2
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete
> state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state dir
> lock for task 0_2
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 1_2
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting obsolete
> state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
> lock for task 1_2
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 0_3
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting obsolete
> state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
> lock for task 0_3
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 1_3
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting obsolete
> state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,399] DEBUG stream-thread [cleanup] Released state dir
> lock for task 1_3
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 0_4
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
> state directory 0_4 for task 0_4 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Released state dir
> lock for task 0_4
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,400] DEBUG stream-thread [cleanup] Acquired state dir
> lock for task 1_4
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,400] INFO stream-thread [cleanup] Deleting obsolete
> state directory 1_4 for task 1_4 as cleanup delay of 0 ms has passed
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,401] DEBUG stream-thread [cleanup] Released state dir
> lock for task 1_4
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2017-10-24 10:27:35,401] DEBUG stream-client
>
> [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d]
> Starting Kafka Stream process. (org.apache.kafka.streams.KafkaStreams)
> [2017-10-24 10:27:35,413] DEBUG Updated cluster metadata version 1 to
> Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions
> = []) (org.apache.kafka.clients.Metadata)
>