You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by James Cheng <jc...@tivo.com> on 2015/03/11 01:15:41 UTC

Re: createMessageStreams vs createMessageStreamsByFilter

Hi,

Sorry to bring up this old thread, but my question is about this exact thing:

Guozhang, you said:
> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> partitions.
> 
> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> 
> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> respectively.


You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed.

createMessageStreamsByFilter("*C" => 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through?

What about createMessageStreams("AC" => 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions?

Thanks,
-James


On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:

> The new consumer will be released in 0.9, which is targeted for end of this
> quarter.
> 
> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com> wrote:
> 
>> Do you know when the new consumer API will be publicly available?
>> 
>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> 
>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>> different processes and AC processors gets stuck, hence AC messages will
>>> fill up in the consumer's buffer and eventually prevents the fetcher
>> thread
>>> to put more data into it; the fetcher thread will be blocked on that and
>>> not be able to fetch BC.
>>> 
>>> This issue has been addressed in the new consumer client, which is
>>> single-threaded with non-blocking APIs.
>>> 
>>> Guozhang
>>> 
>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com> wrote:
>>> 
>>>> Thank you Guozhang for your detailed explanation. In your example
>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared among
>>>> topics there may be situation where all 3 threads threads get stuck
>> with
>>>> topic AC e.g. topic is empty which will be holding the connecting
>> threads
>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>> topic
>>>> BC. do you think this situation will happen?
>>>> 
>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>> 
>>>>> I was not clear before .. for createMessageStreamsByFilter each
>> matched
>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>> num-threads created, but each thread will be responsible for fetching
>>> all
>>>>> matched topics.
>>>>> 
>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>> BC: 6
>>>>> partitions.
>>>>> 
>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>> will
>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>> respectively;
>>>>> 
>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>> will
>>> be
>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>> respectively.
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Guozhang,
>>>>>> 
>>>>>> Do you mean that each regex matched topic owns number of threads
>> that
>>>> get
>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>> code
>>>> If
>>>>> I
>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>> have
>>>>> 3 *
>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>> 
>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>> 
>>>>>> int threadTotal = 2;
>>>>>> 
>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>> 
>>>>>> 
>>>>>> But what I observed from the log is different
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>> consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>> partition 1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>> partitions consumed by consumer thread
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>> kafkatopic-1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>> consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>> partition 1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 
>>>>>> As you can see from the log there are only 2 threads created and
>>> shared
>>>>>> among 3 topics. With this setting I think the parallelism is
>> degraded
>>>>> and a
>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>> thoughts?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> createMessageStreams is used for consuming from specific
>> topic(s),
>>>>> where
>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>> parameters;
>>>>>>> 
>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>> topics,
>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>> and
>>>>> for
>>>>>>> each regex matched topic num-threads will be created.
>>>>>>> 
>>>>>>> The difference between these two are not really for throughput /
>>>>> latency,
>>>>>>> but rather consumption semantics.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi team,
>>>>>>>> 
>>>>>>>> I am comparing the differences between
>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>> understanding
>>>>> is
>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>> number
>>>>>> of
>>>>>>>> threads passed in to the method) dedicated to the specified
>> topic
>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>> shared
>>>>>> by
>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>> 
>>>>>>>> If this is the case I assume createMessageStreams is the
>>> preferred
>>>>> way
>>>>>> to
>>>>>>>> create streams for each topic if I have high throughput and low
>>>>> latency
>>>>>>>> demands. is my assumption correct?
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 
>> 
>> 
>> --
>> Regards,
>> Tao
>> 
> 
> 
> 
> --
> -- Guozhang


Re: createMessageStreams vs createMessageStreamsByFilter

Posted by tao xiao <xi...@gmail.com>.
 consumer.timeout.ms only affects how the stream reads data from the
internal chunk queue that is used to buffer received data. The actual data
fetching is done by another fetcher
thread kafka.consumer.ConsumerFetcherThread. The fetcher thread keeps
reading data from broker and put them to the queue and the stream keeps
polling the queue and passes data back to consumer if any.

So for the case like createMessageStreams("AC" => 1) the same stream (
which means the same chunk queue) is shared by multiple partitions of topic
AC. If one of the partition has no data the consumer is still able to read
data from other partitions as the fetcher thread keeps feeding data from
other partitions to the queue.

The only situation where consumer will get stuck is when fetcher thread is
blocked by network like high network latency between consumer and broker or
no data from broker. This is because fetch thread is implemented using
block I/O


On Wed, Mar 11, 2015 at 8:15 AM, James Cheng <jc...@tivo.com> wrote:

> Hi,
>
> Sorry to bring up this old thread, but my question is about this exact
> thing:
>
> Guozhang, you said:
> > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> > partitions.
> >
> > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> >
> > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > respectively.
>
>
> You said that in the createMessageStreamsByFilter case, if topic AC had no
> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
> be blocked waiting for data to arrive from topic AC, and so messages from
> BC would not be processed.
>
> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> same problem but just worse. Behind the scenes, is there a single thread
> that is consuming (round-robin?) messages from the different partitions and
> inserting them all into a single queue for the application code to process?
> And that is why a single partition with no messages with block the other
> messages from getting through?
>
> What about createMessageStreams("AC" => 1)? That creates a single stream
> that contains messages from multiple partitions, which might be on
> different brokers. Does that also suffer the same problem, where if one
> partition has no messages, that the application would not receive messages
> from the other paritions?
>
> Thanks,
> -James
>
>
> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > The new consumer will be released in 0.9, which is targeted for end of
> this
> > quarter.
> >
> > On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com> wrote:
> >
> >> Do you know when the new consumer API will be publicly available?
> >>
> >> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>> different processes and AC processors gets stuck, hence AC messages
> will
> >>> fill up in the consumer's buffer and eventually prevents the fetcher
> >> thread
> >>> to put more data into it; the fetcher thread will be blocked on that
> and
> >>> not be able to fetch BC.
> >>>
> >>> This issue has been addressed in the new consumer client, which is
> >>> single-threaded with non-blocking APIs.
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
> wrote:
> >>>
> >>>> Thank you Guozhang for your detailed explanation. In your example
> >>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> among
> >>>> topics there may be situation where all 3 threads threads get stuck
> >> with
> >>>> topic AC e.g. topic is empty which will be holding the connecting
> >> threads
> >>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >> topic
> >>>> BC. do you think this situation will happen?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> I was not clear before .. for createMessageStreamsByFilter each
> >> matched
> >>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>> num-threads created, but each thread will be responsible for fetching
> >>> all
> >>>>> matched topics.
> >>>>>
> >>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>> partitions.
> >>>>>
> >>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>
> >>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>> be
> >>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>>>> respectively.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Guozhang,
> >>>>>>
> >>>>>> Do you mean that each regex matched topic owns number of threads
> >> that
> >>>> get
> >>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >> code
> >>>> If
> >>>>> I
> >>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>> have
> >>>>> 3 *
> >>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>
> >>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>
> >>>>>> int threadTotal = 2;
> >>>>>>
> >>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>
> >>>>>>
> >>>>>> But what I observed from the log is different
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>> partitions consumed by consumer thread
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >> kafkatopic-1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>>
> >>>>>> As you can see from the log there are only 2 threads created and
> >>> shared
> >>>>>> among 3 topics. With this setting I think the parallelism is
> >> degraded
> >>>>> and a
> >>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>> thoughts?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >> wangguoz@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> createMessageStreams is used for consuming from specific
> >> topic(s),
> >>>>> where
> >>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>> parameters;
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>> topics,
> >>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>> and
> >>>>> for
> >>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>
> >>>>>>> The difference between these two are not really for throughput /
> >>>>> latency,
> >>>>>>> but rather consumption semantics.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi team,
> >>>>>>>>
> >>>>>>>> I am comparing the differences between
> >>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>> understanding
> >>>>> is
> >>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>> number
> >>>>>> of
> >>>>>>>> threads passed in to the method) dedicated to the specified
> >> topic
> >>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>> shared
> >>>>>> by
> >>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>
> >>>>>>>> If this is the case I assume createMessageStreams is the
> >>> preferred
> >>>>> way
> >>>>>> to
> >>>>>>>> create streams for each topic if I have high throughput and low
> >>>>> latency
> >>>>>>>> demands. is my assumption correct?
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>


-- 
Regards,
Tao

Re: createMessageStreams vs createMessageStreamsByFilter

Posted by tao xiao <xi...@gmail.com>.
Fetch data from a leader to consumer. Replication fetcher is configured by
another property

On Saturday, March 14, 2015, Zakee <kz...@netzero.net> wrote:

> Sorry, but still confused.  Maximum number of threads (fetchers) to fetch
> from a Leader or maximum number of threads within a follower broker?
>
> Thanks for clarifying,
> -Zakee
>
>
>
> > On Mar 12, 2015, at 11:11 PM, tao xiao <xiaotao183@gmail.com
> <javascript:;>> wrote:
> >
> > The number of fetchers is configurable via num.replica.fetchers. The
> > description of num.replica.fetchers in Kafka documentation is not quite
> > accurate. num.replica.fetchers actually controls the max number of
> fetchers
> > per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
> > more 8 fetchers created for each broker
> >
> > On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kzakee1@netzero.net
> <javascript:;>> wrote:
> >
> >> Is this always the case that there is only one fetcher per broker, won’t
> >> setting num.replica.fetchers greater than number-of-brokers cause more
> >> fetchers per broker?
> >> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be
> 2
> >> fetcher threads pulling from  each broker?
> >>
> >> Thanks
> >> Zakee
> >>
> >>
> >>
> >>> On Mar 12, 2015, at 11:15 AM, James Cheng <jcheng@tivo.com
> <javascript:;>> wrote:
> >>>
> >>> Ah, I understand now. I didn't realize that there was one fetcher
> thread
> >> per broker.
> >>>
> >>> Thanks Tao & Guozhang!
> >>> -James
> >>>
> >>>
> >>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao183@gmail.com
> <javascript:;> <mailto:
> >> xiaotao183@gmail.com <javascript:;>>> wrote:
> >>>
> >>>> Fetcher thread is per broker basis, it ensures that at lease one
> fetcher
> >>>> thread per broker. Fetcher thread is sent to broker with a fetch
> >> request to
> >>>> ask for all partitions. So if A, B, C are in the same broker fetcher
> >> thread
> >>>> is still able to fetch data from A, B, C even though A returns no
> data.
> >>>> same logic is applied to different broker.
> >>>>
> >>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jcheng@tivo.com
> <javascript:;>> wrote:
> >>>>
> >>>>>
> >>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangguoz@gmail.com
> <javascript:;>> wrote:
> >>>>>
> >>>>>> Hi James,
> >>>>>>
> >>>>>> What I meant before is that a single fetcher may be responsible for
> >>>>> putting
> >>>>>> fetched data to multiple queues according to the construction of the
> >>>>>> streams setup, where each queue may be consumed by a different
> thread.
> >>>>> And
> >>>>>> the queues are actually bounded. Now say if there are two queues
> that
> >> are
> >>>>>> getting data from the same fetcher F, and are consumed by two
> >> different
> >>>>>> user threads A and B. If thread A for some reason got slowed / hung
> >>>>>> consuming data from queue 1, then queue 1 will eventually get full,
> >> and F
> >>>>>> trying to put more data to it will be blocked. Since F is parked on
> >>>>> trying
> >>>>>> to put data to queue 1, queue 2 will not get more data from it, and
> >>>>> thread
> >>>>>> B may hence gets starved. Does that make sense now?
> >>>>>>
> >>>>>
> >>>>> Yes, that makes sense. That is the scenario where one thread of a
> >> consumer
> >>>>> can cause a backup in the queue, which would cause other threads to
> not
> >>>>> receive data.
> >>>>>
> >>>>> What about the situation I described, where a thread consumes a queue
> >> that
> >>>>> is supposed to be filled with messages from multiple partitions? If
> >>>>> partition A has no messages and partitions B and C do, how will the
> >> fetcher
> >>>>> behave? Will the processing thread receive messages from partitions B
> >> and C?
> >>>>>
> >>>>> Thanks,
> >>>>> -James
> >>>>>
> >>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jcheng@tivo.com
> <javascript:;>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Sorry to bring up this old thread, but my question is about this
> >> exact
> >>>>>>> thing:
> >>>>>>>
> >>>>>>> Guozhang, you said:
> >>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> topic
> >>>>> BC: 6
> >>>>>>>> partitions.
> >>>>>>>>
> >>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> threads
> >>>>> will
> >>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>> respectively;
> >>>>>>>>
> >>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>>>> be
> >>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >> AC-3/BC-5/BC-6
> >>>>>>>> respectively.
> >>>>>>>
> >>>>>>>
> >>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC
> >> had
> >>>>> no
> >>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
> >> might
> >>>>> all
> >>>>>>> be blocked waiting for data to arrive from topic AC, and so
> messages
> >>>>> from
> >>>>>>> BC would not be processed.
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
> >> the
> >>>>>>> same problem but just worse. Behind the scenes, is there a single
> >> thread
> >>>>>>> that is consuming (round-robin?) messages from the different
> >> partitions
> >>>>> and
> >>>>>>> inserting them all into a single queue for the application code to
> >>>>> process?
> >>>>>>> And that is why a single partition with no messages with block the
> >> other
> >>>>>>> messages from getting through?
> >>>>>>>
> >>>>>>> What about createMessageStreams("AC" => 1)? That creates a single
> >> stream
> >>>>>>> that contains messages from multiple partitions, which might be on
> >>>>>>> different brokers. Does that also suffer the same problem, where if
> >> one
> >>>>>>> partition has no messages, that the application would not receive
> >>>>> messages
> >>>>>>> from the other paritions?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> -James
> >>>>>>>
> >>>>>>>
> >>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangguoz@gmail.com
> <javascript:;>>
> >> wrote:
> >>>>>>>
> >>>>>>>> The new consumer will be released in 0.9, which is targeted for
> end
> >> of
> >>>>>>> this
> >>>>>>>> quarter.
> >>>>>>>>
> >>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao183@gmail.com
> <javascript:;>>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Do you know when the new consumer API will be publicly available?
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
> >> wangguoz@gmail.com <javascript:;>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by
> two
> >>>>>>>>>> different processes and AC processors gets stuck, hence AC
> >> messages
> >>>>>>> will
> >>>>>>>>>> fill up in the consumer's buffer and eventually prevents the
> >> fetcher
> >>>>>>>>> thread
> >>>>>>>>>> to put more data into it; the fetcher thread will be blocked on
> >> that
> >>>>>>> and
> >>>>>>>>>> not be able to fetch BC.
> >>>>>>>>>>
> >>>>>>>>>> This issue has been addressed in the new consumer client, which
> is
> >>>>>>>>>> single-threaded with non-blocking APIs.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao183@gmail.com
> <javascript:;>>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your
> example
> >>>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are
> shared
> >>>>>>> among
> >>>>>>>>>>> topics there may be situation where all 3 threads threads get
> >> stuck
> >>>>>>>>> with
> >>>>>>>>>>> topic AC e.g. topic is empty which will be holding the
> connecting
> >>>>>>>>> threads
> >>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
> >> serve
> >>>>>>>>> topic
> >>>>>>>>>>> BC. do you think this situation will happen?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
> >> wangguoz@gmail.com <javascript:;>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter
> each
> >>>>>>>>> matched
> >>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
> >> totally
> >>>>>>>>>>>> num-threads created, but each thread will be responsible for
> >>>>> fetching
> >>>>>>>>>> all
> >>>>>>>>>>>> matched topics.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> >> topic
> >>>>>>>>>> BC: 6
> >>>>>>>>>>>> partitions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> >>>>> threads
> >>>>>>>>>> will
> >>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>>>>>>> respectively;
> >>>>>>>>>>>>
> >>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
> >> threads
> >>>>>>>>> will
> >>>>>>>>>> be
> >>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >>>>> AC-3/BC-5/BC-6
> >>>>>>>>>>>> respectively.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <
> xiaotao183@gmail.com <javascript:;>
> >>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Do you mean that each regex matched topic owns number of
> >> threads
> >>>>>>>>> that
> >>>>>>>>>>> get
> >>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
> >> below
> >>>>>>>>> code
> >>>>>>>>>>> If
> >>>>>>>>>>>> I
> >>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
> >> should
> >>>>>>>>>> have
> >>>>>>>>>>>> 3 *
> >>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> int threadTotal = 2;
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> But what I observed from the log is different
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>>>>>>> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> >> claim
> >>>>>>>>>>>>> partition 1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
> >> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
> >> broker
> >>>>>>>>>>>>> partitions consumed by consumer thread
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>>>>>>> kafkatopic-1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >> Consumer
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>>>> following
> >>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>>>>>>> consumers:
> >>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> >> claim
> >>>>>>>>>>>>> partition 1
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> >> claim
> >>>>>>>>>>>>> partition 0
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As you can see from the log there are only 2 threads created
> >> and
> >>>>>>>>>> shared
> >>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>>>>>>> degraded
> >>>>>>>>>>>> and a
> >>>>>>>>>>>>> slow topic may impact other topics' consumption performance.
> >> Any
> >>>>>>>>>>>> thoughts?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>>>>>>> wangguoz@gmail.com <javascript:;>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> createMessageStreams is used for consuming from specific
> >>>>>>>>> topic(s),
> >>>>>>>>>>>> where
> >>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>>>>>>> parameters;
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
> >> wildcard
> >>>>>>>>>>>> topics,
> >>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input
> >> parameters,
> >>>>>>>>>> and
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The difference between these two are not really for
> >> throughput /
> >>>>>>>>>>>> latency,
> >>>>>>>>>>>>>> but rather consumption semantics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
> >> xiaotao183@gmail.com <javascript:;>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi team,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am comparing the differences between
> >>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>>>>>>> understanding
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
> >> the
> >>>>>>>>>>>> number
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>>>>>>> topic
> >>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
> >> threads
> >>>>>>>>>>> shared
> >>>>>>>>>>>>> by
> >>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>>>>>>> preferred
> >>>>>>>>>>>> way
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> create streams for each topic if I have high throughput and
> >> low
> >>>>>>>>>>>> latency
> >>>>>>>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Tao
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Regards,
> >>>>>>>>> Tao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>
> >>> ____________________________________________________________
> >>> What's your flood risk?
> >>> Find flood maps, interactive tools, FAQs, and agents in your area.
> >>>
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
> >> <
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
> >>
> >
> >
> >
> > --
> > Regards,
> > Tao
> > ____________________________________________________________
> > Want to place your ad here?
> > Advertise on United Online
> > http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc
>
>

-- 
Regards,
Tao

Re: createMessageStreams vs createMessageStreamsByFilter

Posted by Zakee <kz...@netzero.net>.
Sorry, but still confused.  Maximum number of threads (fetchers) to fetch from a Leader or maximum number of threads within a follower broker?

Thanks for clarifying,
-Zakee



> On Mar 12, 2015, at 11:11 PM, tao xiao <xi...@gmail.com> wrote:
> 
> The number of fetchers is configurable via num.replica.fetchers. The
> description of num.replica.fetchers in Kafka documentation is not quite
> accurate. num.replica.fetchers actually controls the max number of fetchers
> per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
> more 8 fetchers created for each broker
> 
> On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kz...@netzero.net> wrote:
> 
>> Is this always the case that there is only one fetcher per broker, won’t
>> setting num.replica.fetchers greater than number-of-brokers cause more
>> fetchers per broker?
>> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2
>> fetcher threads pulling from  each broker?
>> 
>> Thanks
>> Zakee
>> 
>> 
>> 
>>> On Mar 12, 2015, at 11:15 AM, James Cheng <jc...@tivo.com> wrote:
>>> 
>>> Ah, I understand now. I didn't realize that there was one fetcher thread
>> per broker.
>>> 
>>> Thanks Tao & Guozhang!
>>> -James
>>> 
>>> 
>>> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao183@gmail.com <mailto:
>> xiaotao183@gmail.com>> wrote:
>>> 
>>>> Fetcher thread is per broker basis, it ensures that at lease one fetcher
>>>> thread per broker. Fetcher thread is sent to broker with a fetch
>> request to
>>>> ask for all partitions. So if A, B, C are in the same broker fetcher
>> thread
>>>> is still able to fetch data from A, B, C even though A returns no data.
>>>> same logic is applied to different broker.
>>>> 
>>>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jc...@tivo.com> wrote:
>>>> 
>>>>> 
>>>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>>>> 
>>>>>> Hi James,
>>>>>> 
>>>>>> What I meant before is that a single fetcher may be responsible for
>>>>> putting
>>>>>> fetched data to multiple queues according to the construction of the
>>>>>> streams setup, where each queue may be consumed by a different thread.
>>>>> And
>>>>>> the queues are actually bounded. Now say if there are two queues that
>> are
>>>>>> getting data from the same fetcher F, and are consumed by two
>> different
>>>>>> user threads A and B. If thread A for some reason got slowed / hung
>>>>>> consuming data from queue 1, then queue 1 will eventually get full,
>> and F
>>>>>> trying to put more data to it will be blocked. Since F is parked on
>>>>> trying
>>>>>> to put data to queue 1, queue 2 will not get more data from it, and
>>>>> thread
>>>>>> B may hence gets starved. Does that make sense now?
>>>>>> 
>>>>> 
>>>>> Yes, that makes sense. That is the scenario where one thread of a
>> consumer
>>>>> can cause a backup in the queue, which would cause other threads to not
>>>>> receive data.
>>>>> 
>>>>> What about the situation I described, where a thread consumes a queue
>> that
>>>>> is supposed to be filled with messages from multiple partitions? If
>>>>> partition A has no messages and partitions B and C do, how will the
>> fetcher
>>>>> behave? Will the processing thread receive messages from partitions B
>> and C?
>>>>> 
>>>>> Thanks,
>>>>> -James
>>>>> 
>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Sorry to bring up this old thread, but my question is about this
>> exact
>>>>>>> thing:
>>>>>>> 
>>>>>>> Guozhang, you said:
>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>> BC: 6
>>>>>>>> partitions.
>>>>>>>> 
>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>>>> will
>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>> respectively;
>>>>>>>> 
>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>> will
>>>>> be
>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>> AC-3/BC-5/BC-6
>>>>>>>> respectively.
>>>>>>> 
>>>>>>> 
>>>>>>> You said that in the createMessageStreamsByFilter case, if topic AC
>> had
>>>>> no
>>>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
>> might
>>>>> all
>>>>>>> be blocked waiting for data to arrive from topic AC, and so messages
>>>>> from
>>>>>>> BC would not be processed.
>>>>>>> 
>>>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
>> the
>>>>>>> same problem but just worse. Behind the scenes, is there a single
>> thread
>>>>>>> that is consuming (round-robin?) messages from the different
>> partitions
>>>>> and
>>>>>>> inserting them all into a single queue for the application code to
>>>>> process?
>>>>>>> And that is why a single partition with no messages with block the
>> other
>>>>>>> messages from getting through?
>>>>>>> 
>>>>>>> What about createMessageStreams("AC" => 1)? That creates a single
>> stream
>>>>>>> that contains messages from multiple partitions, which might be on
>>>>>>> different brokers. Does that also suffer the same problem, where if
>> one
>>>>>>> partition has no messages, that the application would not receive
>>>>> messages
>>>>>>> from the other paritions?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> -James
>>>>>>> 
>>>>>>> 
>>>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>>>>> 
>>>>>>>> The new consumer will be released in 0.9, which is targeted for end
>> of
>>>>>>> this
>>>>>>>> quarter.
>>>>>>>> 
>>>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>>>>> different processes and AC processors gets stuck, hence AC
>> messages
>>>>>>> will
>>>>>>>>>> fill up in the consumer's buffer and eventually prevents the
>> fetcher
>>>>>>>>> thread
>>>>>>>>>> to put more data into it; the fetcher thread will be blocked on
>> that
>>>>>>> and
>>>>>>>>>> not be able to fetch BC.
>>>>>>>>>> 
>>>>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>>>>> single-threaded with non-blocking APIs.
>>>>>>>>>> 
>>>>>>>>>> Guozhang
>>>>>>>>>> 
>>>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>>>>> among
>>>>>>>>>>> topics there may be situation where all 3 threads threads get
>> stuck
>>>>>>>>> with
>>>>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>>>>> threads
>>>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
>> serve
>>>>>>>>> topic
>>>>>>>>>>> BC. do you think this situation will happen?
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>>>>> matched
>>>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
>> totally
>>>>>>>>>>>> num-threads created, but each thread will be responsible for
>>>>> fetching
>>>>>>>>>> all
>>>>>>>>>>>> matched topics.
>>>>>>>>>>>> 
>>>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
>> topic
>>>>>>>>>> BC: 6
>>>>>>>>>>>> partitions.
>>>>>>>>>>>> 
>>>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>>>>> threads
>>>>>>>>>> will
>>>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>>>>> respectively;
>>>>>>>>>>>> 
>>>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
>> threads
>>>>>>>>> will
>>>>>>>>>> be
>>>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>>>>> AC-3/BC-5/BC-6
>>>>>>>>>>>> respectively.
>>>>>>>>>>>> 
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao183@gmail.com
>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Guozhang,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Do you mean that each regex matched topic owns number of
>> threads
>>>>>>>>> that
>>>>>>>>>>> get
>>>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
>> below
>>>>>>>>> code
>>>>>>>>>>> If
>>>>>>>>>>>> I
>>>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
>> should
>>>>>>>>>> have
>>>>>>>>>>>> 3 *
>>>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>>>>> 
>>>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>>>>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
>> claim
>>>>>>>>>>>>> partition 1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
>> broker
>>>>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>>>>> kafkatopic-1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>> Consumer
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>>>> following
>>>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>>>>> consumers:
>>>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
>> claim
>>>>>>>>>>>>> partition 1
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
>> claim
>>>>>>>>>>>>> partition 0
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> As you can see from the log there are only 2 threads created
>> and
>>>>>>>>>> shared
>>>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>>>>> degraded
>>>>>>>>>>>> and a
>>>>>>>>>>>>> slow topic may impact other topics' consumption performance.
>> Any
>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>>>>> topic(s),
>>>>>>>>>>>> where
>>>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>>>>> parameters;
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
>> wildcard
>>>>>>>>>>>> topics,
>>>>>>>>>>>>>> where you can put a (regex, num-threads) as its input
>> parameters,
>>>>>>>>>> and
>>>>>>>>>>>> for
>>>>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The difference between these two are not really for
>> throughput /
>>>>>>>>>>>> latency,
>>>>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
>> xiaotao183@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi team,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>>>>> understanding
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
>> the
>>>>>>>>>>>> number
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>>>>> topic
>>>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
>> threads
>>>>>>>>>>> shared
>>>>>>>>>>>>> by
>>>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>>>>> preferred
>>>>>>>>>>>> way
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> create streams for each topic if I have high throughput and
>> low
>>>>>>>>>>>> latency
>>>>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Tao
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Tao
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Regards,
>>>>>>>>>>> Tao
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Tao
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>> 
>>> ____________________________________________________________
>>> What's your flood risk?
>>> Find flood maps, interactive tools, FAQs, and agents in your area.
>>> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
>> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
>> 
> 
> 
> 
> -- 
> Regards,
> Tao
> ____________________________________________________________
> Want to place your ad here?
> Advertise on United Online
> http://thirdpartyoffers.netzero.net/TGL3255/55028688c3a996884bccmp02duc


Re: createMessageStreams vs createMessageStreamsByFilter

Posted by tao xiao <xi...@gmail.com>.
The number of fetchers is configurable via num.replica.fetchers. The
description of num.replica.fetchers in Kafka documentation is not quite
accurate. num.replica.fetchers actually controls the max number of fetchers
per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
more 8 fetchers created for each broker

On Fri, Mar 13, 2015 at 1:21 PM, Zakee <kz...@netzero.net> wrote:

> Is this always the case that there is only one fetcher per broker, won’t
> setting num.replica.fetchers greater than number-of-brokers cause more
> fetchers per broker?
> Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2
> fetcher threads pulling from  each broker?
>
> Thanks
> Zakee
>
>
>
> > On Mar 12, 2015, at 11:15 AM, James Cheng <jc...@tivo.com> wrote:
> >
> > Ah, I understand now. I didn't realize that there was one fetcher thread
> per broker.
> >
> > Thanks Tao & Guozhang!
> > -James
> >
> >
> > On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao183@gmail.com <mailto:
> xiaotao183@gmail.com>> wrote:
> >
> >> Fetcher thread is per broker basis, it ensures that at lease one fetcher
> >> thread per broker. Fetcher thread is sent to broker with a fetch
> request to
> >> ask for all partitions. So if A, B, C are in the same broker fetcher
> thread
> >> is still able to fetch data from A, B, C even though A returns no data.
> >> same logic is applied to different broker.
> >>
> >> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jc...@tivo.com> wrote:
> >>
> >>>
> >>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:
> >>>
> >>>> Hi James,
> >>>>
> >>>> What I meant before is that a single fetcher may be responsible for
> >>> putting
> >>>> fetched data to multiple queues according to the construction of the
> >>>> streams setup, where each queue may be consumed by a different thread.
> >>> And
> >>>> the queues are actually bounded. Now say if there are two queues that
> are
> >>>> getting data from the same fetcher F, and are consumed by two
> different
> >>>> user threads A and B. If thread A for some reason got slowed / hung
> >>>> consuming data from queue 1, then queue 1 will eventually get full,
> and F
> >>>> trying to put more data to it will be blocked. Since F is parked on
> >>> trying
> >>>> to put data to queue 1, queue 2 will not get more data from it, and
> >>> thread
> >>>> B may hence gets starved. Does that make sense now?
> >>>>
> >>>
> >>> Yes, that makes sense. That is the scenario where one thread of a
> consumer
> >>> can cause a backup in the queue, which would cause other threads to not
> >>> receive data.
> >>>
> >>> What about the situation I described, where a thread consumes a queue
> that
> >>> is supposed to be filled with messages from multiple partitions? If
> >>> partition A has no messages and partitions B and C do, how will the
> fetcher
> >>> behave? Will the processing thread receive messages from partitions B
> and C?
> >>>
> >>> Thanks,
> >>> -James
> >>>
> >>>
> >>>> Guozhang
> >>>>
> >>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Sorry to bring up this old thread, but my question is about this
> exact
> >>>>> thing:
> >>>>>
> >>>>> Guozhang, you said:
> >>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>>> partitions.
> >>>>>>
> >>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>>
> >>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> will
> >>> be
> >>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> AC-3/BC-5/BC-6
> >>>>>> respectively.
> >>>>>
> >>>>>
> >>>>> You said that in the createMessageStreamsByFilter case, if topic AC
> had
> >>> no
> >>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads
> might
> >>> all
> >>>>> be blocked waiting for data to arrive from topic AC, and so messages
> >>> from
> >>>>> BC would not be processed.
> >>>>>
> >>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have
> the
> >>>>> same problem but just worse. Behind the scenes, is there a single
> thread
> >>>>> that is consuming (round-robin?) messages from the different
> partitions
> >>> and
> >>>>> inserting them all into a single queue for the application code to
> >>> process?
> >>>>> And that is why a single partition with no messages with block the
> other
> >>>>> messages from getting through?
> >>>>>
> >>>>> What about createMessageStreams("AC" => 1)? That creates a single
> stream
> >>>>> that contains messages from multiple partitions, which might be on
> >>>>> different brokers. Does that also suffer the same problem, where if
> one
> >>>>> partition has no messages, that the application would not receive
> >>> messages
> >>>>> from the other paritions?
> >>>>>
> >>>>> Thanks,
> >>>>> -James
> >>>>>
> >>>>>
> >>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>>>
> >>>>>> The new consumer will be released in 0.9, which is targeted for end
> of
> >>>>> this
> >>>>>> quarter.
> >>>>>>
> >>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>>> Do you know when the new consumer API will be publicly available?
> >>>>>>>
> >>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <
> wangguoz@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>>>>>>> different processes and AC processors gets stuck, hence AC
> messages
> >>>>> will
> >>>>>>>> fill up in the consumer's buffer and eventually prevents the
> fetcher
> >>>>>>> thread
> >>>>>>>> to put more data into it; the fetcher thread will be blocked on
> that
> >>>>> and
> >>>>>>>> not be able to fetch BC.
> >>>>>>>>
> >>>>>>>> This issue has been addressed in the new consumer client, which is
> >>>>>>>> single-threaded with non-blocking APIs.
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
> >>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> >>>>> among
> >>>>>>>>> topics there may be situation where all 3 threads threads get
> stuck
> >>>>>>> with
> >>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
> >>>>>>> threads
> >>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to
> serve
> >>>>>>> topic
> >>>>>>>>> BC. do you think this situation will happen?
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <
> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
> >>>>>>> matched
> >>>>>>>>>> topic will have num-threads, but shared: i.e. there will be
> totally
> >>>>>>>>>> num-threads created, but each thread will be responsible for
> >>> fetching
> >>>>>>>> all
> >>>>>>>>>> matched topics.
> >>>>>>>>>>
> >>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions,
> topic
> >>>>>>>> BC: 6
> >>>>>>>>>> partitions.
> >>>>>>>>>>
> >>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> >>> threads
> >>>>>>>> will
> >>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>>>>> respectively;
> >>>>>>>>>>
> >>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3
> threads
> >>>>>>> will
> >>>>>>>> be
> >>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> >>> AC-3/BC-5/BC-6
> >>>>>>>>>> respectively.
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao183@gmail.com
> >
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Guozhang,
> >>>>>>>>>>>
> >>>>>>>>>>> Do you mean that each regex matched topic owns number of
> threads
> >>>>>>> that
> >>>>>>>>> get
> >>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in
> below
> >>>>>>> code
> >>>>>>>>> If
> >>>>>>>>>> I
> >>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I
> should
> >>>>>>>> have
> >>>>>>>>>> 3 *
> >>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>>>>
> >>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>>>>
> >>>>>>>>>>> int threadTotal = 2;
> >>>>>>>>>>>
> >>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> But what I observed from the log is different
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>>>>> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> claim
> >>>>>>>>>>> partition 1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with
> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No
> broker
> >>>>>>>>>>> partitions consumed by consumer thread
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>>>>> kafkatopic-1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> Consumer
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>>>>> following
> >>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>>>>> consumers:
> >>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to
> claim
> >>>>>>>>>>> partition 1
> >>>>>>>>>>>
> >>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to
> claim
> >>>>>>>>>>> partition 0
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> As you can see from the log there are only 2 threads created
> and
> >>>>>>>> shared
> >>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>>>>> degraded
> >>>>>>>>>> and a
> >>>>>>>>>>> slow topic may impact other topics' consumption performance.
> Any
> >>>>>>>>>> thoughts?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>>>>> wangguoz@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> createMessageStreams is used for consuming from specific
> >>>>>>> topic(s),
> >>>>>>>>>> where
> >>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>>>>> parameters;
> >>>>>>>>>>>>
> >>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from
> wildcard
> >>>>>>>>>> topics,
> >>>>>>>>>>>> where you can put a (regex, num-threads) as its input
> parameters,
> >>>>>>>> and
> >>>>>>>>>> for
> >>>>>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The difference between these two are not really for
> throughput /
> >>>>>>>>>> latency,
> >>>>>>>>>>>> but rather consumption semantics.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <
> xiaotao183@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi team,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I am comparing the differences between
> >>>>>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>>>>> understanding
> >>>>>>>>>> is
> >>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is
> the
> >>>>>>>>>> number
> >>>>>>>>>>> of
> >>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>>>>> topic
> >>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of
> threads
> >>>>>>>>> shared
> >>>>>>>>>>> by
> >>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>>>>> preferred
> >>>>>>>>>> way
> >>>>>>>>>>> to
> >>>>>>>>>>>>> create streams for each topic if I have high throughput and
> low
> >>>>>>>>>> latency
> >>>>>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Tao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Tao
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Regards,
> >>>>>>>>> Tao
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Regards,
> >>>>>>> Tao
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>
> >>>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >
> > ____________________________________________________________
> > What's your flood risk?
> > Find flood maps, interactive tools, FAQs, and agents in your area.
> > http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc
> <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>
>



-- 
Regards,
Tao

Re: createMessageStreams vs createMessageStreamsByFilter

Posted by Zakee <kz...@netzero.net>.
Is this always the case that there is only one fetcher per broker, won’t setting num.replica.fetchers greater than number-of-brokers cause more fetchers per broker? 
Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2 fetcher threads pulling from  each broker?

Thanks
Zakee



> On Mar 12, 2015, at 11:15 AM, James Cheng <jc...@tivo.com> wrote:
> 
> Ah, I understand now. I didn't realize that there was one fetcher thread per broker. 
> 
> Thanks Tao & Guozhang!
> -James
> 
> 
> On Mar 11, 2015, at 5:00 PM, tao xiao <xiaotao183@gmail.com <ma...@gmail.com>> wrote:
> 
>> Fetcher thread is per broker basis, it ensures that at lease one fetcher
>> thread per broker. Fetcher thread is sent to broker with a fetch request to
>> ask for all partitions. So if A, B, C are in the same broker fetcher thread
>> is still able to fetch data from A, B, C even though A returns no data.
>> same logic is applied to different broker.
>> 
>> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jc...@tivo.com> wrote:
>> 
>>> 
>>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>> 
>>>> Hi James,
>>>> 
>>>> What I meant before is that a single fetcher may be responsible for
>>> putting
>>>> fetched data to multiple queues according to the construction of the
>>>> streams setup, where each queue may be consumed by a different thread.
>>> And
>>>> the queues are actually bounded. Now say if there are two queues that are
>>>> getting data from the same fetcher F, and are consumed by two different
>>>> user threads A and B. If thread A for some reason got slowed / hung
>>>> consuming data from queue 1, then queue 1 will eventually get full, and F
>>>> trying to put more data to it will be blocked. Since F is parked on
>>> trying
>>>> to put data to queue 1, queue 2 will not get more data from it, and
>>> thread
>>>> B may hence gets starved. Does that make sense now?
>>>> 
>>> 
>>> Yes, that makes sense. That is the scenario where one thread of a consumer
>>> can cause a backup in the queue, which would cause other threads to not
>>> receive data.
>>> 
>>> What about the situation I described, where a thread consumes a queue that
>>> is supposed to be filled with messages from multiple partitions? If
>>> partition A has no messages and partitions B and C do, how will the fetcher
>>> behave? Will the processing thread receive messages from partitions B and C?
>>> 
>>> Thanks,
>>> -James
>>> 
>>> 
>>>> Guozhang
>>>> 
>>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Sorry to bring up this old thread, but my question is about this exact
>>>>> thing:
>>>>> 
>>>>> Guozhang, you said:
>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>> BC: 6
>>>>>> partitions.
>>>>>> 
>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>> will
>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>> respectively;
>>>>>> 
>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
>>> be
>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>>> respectively.
>>>>> 
>>>>> 
>>>>> You said that in the createMessageStreamsByFilter case, if topic AC had
>>> no
>>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might
>>> all
>>>>> be blocked waiting for data to arrive from topic AC, and so messages
>>> from
>>>>> BC would not be processed.
>>>>> 
>>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>>>>> same problem but just worse. Behind the scenes, is there a single thread
>>>>> that is consuming (round-robin?) messages from the different partitions
>>> and
>>>>> inserting them all into a single queue for the application code to
>>> process?
>>>>> And that is why a single partition with no messages with block the other
>>>>> messages from getting through?
>>>>> 
>>>>> What about createMessageStreams("AC" => 1)? That creates a single stream
>>>>> that contains messages from multiple partitions, which might be on
>>>>> different brokers. Does that also suffer the same problem, where if one
>>>>> partition has no messages, that the application would not receive
>>> messages
>>>>> from the other paritions?
>>>>> 
>>>>> Thanks,
>>>>> -James
>>>>> 
>>>>> 
>>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>>>> 
>>>>>> The new consumer will be released in 0.9, which is targeted for end of
>>>>> this
>>>>>> quarter.
>>>>>> 
>>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>>> 
>>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>>> different processes and AC processors gets stuck, hence AC messages
>>>>> will
>>>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>>>>> thread
>>>>>>>> to put more data into it; the fetcher thread will be blocked on that
>>>>> and
>>>>>>>> not be able to fetch BC.
>>>>>>>> 
>>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>>> single-threaded with non-blocking APIs.
>>>>>>>> 
>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>>> among
>>>>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>>>>> with
>>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>>> threads
>>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>>>>> topic
>>>>>>>>> BC. do you think this situation will happen?
>>>>>>>>> 
>>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>>> matched
>>>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>>>>> num-threads created, but each thread will be responsible for
>>> fetching
>>>>>>>> all
>>>>>>>>>> matched topics.
>>>>>>>>>> 
>>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>>>>> BC: 6
>>>>>>>>>> partitions.
>>>>>>>>>> 
>>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>>> threads
>>>>>>>> will
>>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>>> respectively;
>>>>>>>>>> 
>>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>>>>> will
>>>>>>>> be
>>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>>> AC-3/BC-5/BC-6
>>>>>>>>>> respectively.
>>>>>>>>>> 
>>>>>>>>>> Guozhang
>>>>>>>>>> 
>>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Guozhang,
>>>>>>>>>>> 
>>>>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>>>>> that
>>>>>>>>> get
>>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>>>>> code
>>>>>>>>> If
>>>>>>>>>> I
>>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>>>>> have
>>>>>>>>>> 3 *
>>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>>> 
>>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>>> 
>>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>>> 
>>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>>> consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>>> partition 1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>>> kafkatopic-1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>>> following
>>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>>> consumers:
>>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>>> partition 1
>>>>>>>>>>> 
>>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>>> partition 0
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>>>>> shared
>>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>>> degraded
>>>>>>>>>> and a
>>>>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>>>>> thoughts?
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>>> wangguoz@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>>> topic(s),
>>>>>>>>>> where
>>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>>> parameters;
>>>>>>>>>>>> 
>>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>>>>> topics,
>>>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>>>>> and
>>>>>>>>>> for
>>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>>> 
>>>>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>>>>> latency,
>>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>>> 
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi team,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>>> understanding
>>>>>>>>>> is
>>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>>>>> number
>>>>>>>>>>> of
>>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>>> topic
>>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>>>>> shared
>>>>>>>>>>> by
>>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>>> preferred
>>>>>>>>>> way
>>>>>>>>>>> to
>>>>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>>>>> latency
>>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Tao
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Regards,
>>>>>>>>>>> Tao
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Tao
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Tao
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>> 
>>> 
>> 
>> 
>> -- 
>> Regards,
>> Tao
> 
> ____________________________________________________________
> What's your flood risk?
> Find flood maps, interactive tools, FAQs, and agents in your area.
> http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc <http://thirdpartyoffers.netzero.net/TGL3255/5501df28707b05f280272mp12duc>

Re: createMessageStreams vs createMessageStreamsByFilter

Posted by James Cheng <jc...@tivo.com>.
Ah, I understand now. I didn't realize that there was one fetcher thread per broker. 

Thanks Tao & Guozhang!
-James


On Mar 11, 2015, at 5:00 PM, tao xiao <xi...@gmail.com> wrote:

> Fetcher thread is per broker basis, it ensures that at lease one fetcher
> thread per broker. Fetcher thread is sent to broker with a fetch request to
> ask for all partitions. So if A, B, C are in the same broker fetcher thread
> is still able to fetch data from A, B, C even though A returns no data.
> same logic is applied to different broker.
> 
> On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jc...@tivo.com> wrote:
> 
>> 
>> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:
>> 
>>> Hi James,
>>> 
>>> What I meant before is that a single fetcher may be responsible for
>> putting
>>> fetched data to multiple queues according to the construction of the
>>> streams setup, where each queue may be consumed by a different thread.
>> And
>>> the queues are actually bounded. Now say if there are two queues that are
>>> getting data from the same fetcher F, and are consumed by two different
>>> user threads A and B. If thread A for some reason got slowed / hung
>>> consuming data from queue 1, then queue 1 will eventually get full, and F
>>> trying to put more data to it will be blocked. Since F is parked on
>> trying
>>> to put data to queue 1, queue 2 will not get more data from it, and
>> thread
>>> B may hence gets starved. Does that make sense now?
>>> 
>> 
>> Yes, that makes sense. That is the scenario where one thread of a consumer
>> can cause a backup in the queue, which would cause other threads to not
>> receive data.
>> 
>> What about the situation I described, where a thread consumes a queue that
>> is supposed to be filled with messages from multiple partitions? If
>> partition A has no messages and partitions B and C do, how will the fetcher
>> behave? Will the processing thread receive messages from partitions B and C?
>> 
>> Thanks,
>> -James
>> 
>> 
>>> Guozhang
>>> 
>>> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Sorry to bring up this old thread, but my question is about this exact
>>>> thing:
>>>> 
>>>> Guozhang, you said:
>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>> BC: 6
>>>>> partitions.
>>>>> 
>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>> will
>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>> respectively;
>>>>> 
>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
>> be
>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>> respectively.
>>>> 
>>>> 
>>>> You said that in the createMessageStreamsByFilter case, if topic AC had
>> no
>>>> messages in it and consumer.timeout.ms = -1, then the 3 threads might
>> all
>>>> be blocked waiting for data to arrive from topic AC, and so messages
>> from
>>>> BC would not be processed.
>>>> 
>>>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>>>> same problem but just worse. Behind the scenes, is there a single thread
>>>> that is consuming (round-robin?) messages from the different partitions
>> and
>>>> inserting them all into a single queue for the application code to
>> process?
>>>> And that is why a single partition with no messages with block the other
>>>> messages from getting through?
>>>> 
>>>> What about createMessageStreams("AC" => 1)? That creates a single stream
>>>> that contains messages from multiple partitions, which might be on
>>>> different brokers. Does that also suffer the same problem, where if one
>>>> partition has no messages, that the application would not receive
>> messages
>>>> from the other paritions?
>>>> 
>>>> Thanks,
>>>> -James
>>>> 
>>>> 
>>>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>>> 
>>>>> The new consumer will be released in 0.9, which is targeted for end of
>>>> this
>>>>> quarter.
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Do you know when the new consumer API will be publicly available?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>>>> different processes and AC processors gets stuck, hence AC messages
>>>> will
>>>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>>>> thread
>>>>>>> to put more data into it; the fetcher thread will be blocked on that
>>>> and
>>>>>>> not be able to fetch BC.
>>>>>>> 
>>>>>>> This issue has been addressed in the new consumer client, which is
>>>>>>> single-threaded with non-blocking APIs.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
>>>> wrote:
>>>>>>> 
>>>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>>>> among
>>>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>>>> with
>>>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>>>> threads
>>>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>>>> topic
>>>>>>>> BC. do you think this situation will happen?
>>>>>>>> 
>>>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>>>> matched
>>>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>>>> num-threads created, but each thread will be responsible for
>> fetching
>>>>>>> all
>>>>>>>>> matched topics.
>>>>>>>>> 
>>>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>>>> BC: 6
>>>>>>>>> partitions.
>>>>>>>>> 
>>>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
>> threads
>>>>>>> will
>>>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>>>> respectively;
>>>>>>>>> 
>>>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>>>> will
>>>>>>> be
>>>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
>> AC-3/BC-5/BC-6
>>>>>>>>> respectively.
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Guozhang,
>>>>>>>>>> 
>>>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>>>> that
>>>>>>>> get
>>>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>>>> code
>>>>>>>> If
>>>>>>>>> I
>>>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>>>> have
>>>>>>>>> 3 *
>>>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>>>> 
>>>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>>>> 
>>>>>>>>>> int threadTotal = 2;
>>>>>>>>>> 
>>>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> But what I observed from the log is different
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>>>> consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>> partition 1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>>>> partitions consumed by consumer thread
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>>>> kafkatopic-1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>>>> following
>>>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>>>> consumers:
>>>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>>>> partition 1
>>>>>>>>>> 
>>>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>>>> partition 0
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>>>> shared
>>>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>>>> degraded
>>>>>>>>> and a
>>>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>>>> thoughts?
>>>>>>>>>> 
>>>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>>>> wangguoz@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> createMessageStreams is used for consuming from specific
>>>>>> topic(s),
>>>>>>>>> where
>>>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>>>> parameters;
>>>>>>>>>>> 
>>>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>>>> topics,
>>>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>>>> and
>>>>>>>>> for
>>>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>>>> 
>>>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>>>> latency,
>>>>>>>>>>> but rather consumption semantics.
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi team,
>>>>>>>>>>>> 
>>>>>>>>>>>> I am comparing the differences between
>>>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>>>> understanding
>>>>>>>>> is
>>>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>>>> number
>>>>>>>>>> of
>>>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>>>> topic
>>>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>>>> shared
>>>>>>>>>> by
>>>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>>>> 
>>>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>>>> preferred
>>>>>>>>> way
>>>>>>>>>> to
>>>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>>>> latency
>>>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Tao
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Regards,
>>>>>>>>>> Tao
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> Regards,
> Tao


Re: createMessageStreams vs createMessageStreamsByFilter

Posted by tao xiao <xi...@gmail.com>.
Fetcher thread is per broker basis, it ensures that at lease one fetcher
thread per broker. Fetcher thread is sent to broker with a fetch request to
ask for all partitions. So if A, B, C are in the same broker fetcher thread
is still able to fetch data from A, B, C even though A returns no data.
same logic is applied to different broker.

On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jc...@tivo.com> wrote:

>
> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi James,
> >
> > What I meant before is that a single fetcher may be responsible for
> putting
> > fetched data to multiple queues according to the construction of the
> > streams setup, where each queue may be consumed by a different thread.
> And
> > the queues are actually bounded. Now say if there are two queues that are
> > getting data from the same fetcher F, and are consumed by two different
> > user threads A and B. If thread A for some reason got slowed / hung
> > consuming data from queue 1, then queue 1 will eventually get full, and F
> > trying to put more data to it will be blocked. Since F is parked on
> trying
> > to put data to queue 1, queue 2 will not get more data from it, and
> thread
> > B may hence gets starved. Does that make sense now?
> >
>
> Yes, that makes sense. That is the scenario where one thread of a consumer
> can cause a backup in the queue, which would cause other threads to not
> receive data.
>
> What about the situation I described, where a thread consumes a queue that
> is supposed to be filled with messages from multiple partitions? If
> partition A has no messages and partitions B and C do, how will the fetcher
> behave? Will the processing thread receive messages from partitions B and C?
>
> Thanks,
> -James
>
>
> > Guozhang
> >
> > On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
> >
> >> Hi,
> >>
> >> Sorry to bring up this old thread, but my question is about this exact
> >> thing:
> >>
> >> Guozhang, you said:
> >>> A more concrete example: say you have topic AC: 3 partitions, topic
> BC: 6
> >>> partitions.
> >>>
> >>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> will
> >>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> respectively;
> >>>
> >>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
> be
> >>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>> respectively.
> >>
> >>
> >> You said that in the createMessageStreamsByFilter case, if topic AC had
> no
> >> messages in it and consumer.timeout.ms = -1, then the 3 threads might
> all
> >> be blocked waiting for data to arrive from topic AC, and so messages
> from
> >> BC would not be processed.
> >>
> >> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> >> same problem but just worse. Behind the scenes, is there a single thread
> >> that is consuming (round-robin?) messages from the different partitions
> and
> >> inserting them all into a single queue for the application code to
> process?
> >> And that is why a single partition with no messages with block the other
> >> messages from getting through?
> >>
> >> What about createMessageStreams("AC" => 1)? That creates a single stream
> >> that contains messages from multiple partitions, which might be on
> >> different brokers. Does that also suffer the same problem, where if one
> >> partition has no messages, that the application would not receive
> messages
> >> from the other paritions?
> >>
> >> Thanks,
> >> -James
> >>
> >>
> >> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
> >>
> >>> The new consumer will be released in 0.9, which is targeted for end of
> >> this
> >>> quarter.
> >>>
> >>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com>
> wrote:
> >>>
> >>>> Do you know when the new consumer API will be publicly available?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>>>> different processes and AC processors gets stuck, hence AC messages
> >> will
> >>>>> fill up in the consumer's buffer and eventually prevents the fetcher
> >>>> thread
> >>>>> to put more data into it; the fetcher thread will be blocked on that
> >> and
> >>>>> not be able to fetch BC.
> >>>>>
> >>>>> This issue has been addressed in the new consumer client, which is
> >>>>> single-threaded with non-blocking APIs.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Thank you Guozhang for your detailed explanation. In your example
> >>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> >> among
> >>>>>> topics there may be situation where all 3 threads threads get stuck
> >>>> with
> >>>>>> topic AC e.g. topic is empty which will be holding the connecting
> >>>> threads
> >>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >>>> topic
> >>>>>> BC. do you think this situation will happen?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> I was not clear before .. for createMessageStreamsByFilter each
> >>>> matched
> >>>>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>>>> num-threads created, but each thread will be responsible for
> fetching
> >>>>> all
> >>>>>>> matched topics.
> >>>>>>>
> >>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>>>> BC: 6
> >>>>>>> partitions.
> >>>>>>>
> >>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> threads
> >>>>> will
> >>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>> respectively;
> >>>>>>>
> >>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >>>> will
> >>>>> be
> >>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> AC-3/BC-5/BC-6
> >>>>>>> respectively.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Guozhang,
> >>>>>>>>
> >>>>>>>> Do you mean that each regex matched topic owns number of threads
> >>>> that
> >>>>>> get
> >>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >>>> code
> >>>>>> If
> >>>>>>> I
> >>>>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>>>> have
> >>>>>>> 3 *
> >>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>
> >>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>
> >>>>>>>> int threadTotal = 2;
> >>>>>>>>
> >>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> But what I observed from the log is different
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>> consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>>>> partition 1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>>>> partitions consumed by consumer thread
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>> kafkatopic-1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>> consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>>>> partition 1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> As you can see from the log there are only 2 threads created and
> >>>>> shared
> >>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>> degraded
> >>>>>>> and a
> >>>>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>>>> thoughts?
> >>>>>>>>
> >>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>> wangguoz@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> createMessageStreams is used for consuming from specific
> >>>> topic(s),
> >>>>>>> where
> >>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>> parameters;
> >>>>>>>>>
> >>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>>>> topics,
> >>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>>>> and
> >>>>>>> for
> >>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>
> >>>>>>>>> The difference between these two are not really for throughput /
> >>>>>>> latency,
> >>>>>>>>> but rather consumption semantics.
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi team,
> >>>>>>>>>>
> >>>>>>>>>> I am comparing the differences between
> >>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>> understanding
> >>>>>>> is
> >>>>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>>>> number
> >>>>>>>> of
> >>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>> topic
> >>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>>>> shared
> >>>>>>>> by
> >>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>
> >>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>> preferred
> >>>>>>> way
> >>>>>>>> to
> >>>>>>>>>> create streams for each topic if I have high throughput and low
> >>>>>>> latency
> >>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Regards,
> >>>>>>>>>> Tao
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>


-- 
Regards,
Tao

Re: createMessageStreams vs createMessageStreamsByFilter

Posted by James Cheng <jc...@tivo.com>.
On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi James,
> 
> What I meant before is that a single fetcher may be responsible for putting
> fetched data to multiple queues according to the construction of the
> streams setup, where each queue may be consumed by a different thread. And
> the queues are actually bounded. Now say if there are two queues that are
> getting data from the same fetcher F, and are consumed by two different
> user threads A and B. If thread A for some reason got slowed / hung
> consuming data from queue 1, then queue 1 will eventually get full, and F
> trying to put more data to it will be blocked. Since F is parked on trying
> to put data to queue 1, queue 2 will not get more data from it, and thread
> B may hence gets starved. Does that make sense now?
> 

Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data.

What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C?

Thanks,
-James


> Guozhang
> 
> On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:
> 
>> Hi,
>> 
>> Sorry to bring up this old thread, but my question is about this exact
>> thing:
>> 
>> Guozhang, you said:
>>> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
>>> partitions.
>>> 
>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
>>> 
>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>> respectively.
>> 
>> 
>> You said that in the createMessageStreamsByFilter case, if topic AC had no
>> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
>> be blocked waiting for data to arrive from topic AC, and so messages from
>> BC would not be processed.
>> 
>> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
>> same problem but just worse. Behind the scenes, is there a single thread
>> that is consuming (round-robin?) messages from the different partitions and
>> inserting them all into a single queue for the application code to process?
>> And that is why a single partition with no messages with block the other
>> messages from getting through?
>> 
>> What about createMessageStreams("AC" => 1)? That creates a single stream
>> that contains messages from multiple partitions, which might be on
>> different brokers. Does that also suffer the same problem, where if one
>> partition has no messages, that the application would not receive messages
>> from the other paritions?
>> 
>> Thanks,
>> -James
>> 
>> 
>> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
>> 
>>> The new consumer will be released in 0.9, which is targeted for end of
>> this
>>> quarter.
>>> 
>>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com> wrote:
>>> 
>>>> Do you know when the new consumer API will be publicly available?
>>>> 
>>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>>>> different processes and AC processors gets stuck, hence AC messages
>> will
>>>>> fill up in the consumer's buffer and eventually prevents the fetcher
>>>> thread
>>>>> to put more data into it; the fetcher thread will be blocked on that
>> and
>>>>> not be able to fetch BC.
>>>>> 
>>>>> This issue has been addressed in the new consumer client, which is
>>>>> single-threaded with non-blocking APIs.
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Thank you Guozhang for your detailed explanation. In your example
>>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
>> among
>>>>>> topics there may be situation where all 3 threads threads get stuck
>>>> with
>>>>>> topic AC e.g. topic is empty which will be holding the connecting
>>>> threads
>>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>>>> topic
>>>>>> BC. do you think this situation will happen?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> I was not clear before .. for createMessageStreamsByFilter each
>>>> matched
>>>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>>>> num-threads created, but each thread will be responsible for fetching
>>>>> all
>>>>>>> matched topics.
>>>>>>> 
>>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>>>> BC: 6
>>>>>>> partitions.
>>>>>>> 
>>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>>>> will
>>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>>>> respectively;
>>>>>>> 
>>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>>>> will
>>>>> be
>>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>>>> respectively.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Guozhang,
>>>>>>>> 
>>>>>>>> Do you mean that each regex matched topic owns number of threads
>>>> that
>>>>>> get
>>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>>>> code
>>>>>> If
>>>>>>> I
>>>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>>>> have
>>>>>>> 3 *
>>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>>>> 
>>>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>>>> 
>>>>>>>> int threadTotal = 2;
>>>>>>>> 
>>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>>>> 
>>>>>>>> 
>>>>>>>> But what I observed from the log is different
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>>>> consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>> partition 1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>>>> partitions consumed by consumer thread
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>>>> kafkatopic-1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>>>> following
>>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>>>> consumers:
>>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>>>> partition 1
>>>>>>>> 
>>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>>>> partition 0
>>>>>>>> 
>>>>>>>> 
>>>>>>>> As you can see from the log there are only 2 threads created and
>>>>> shared
>>>>>>>> among 3 topics. With this setting I think the parallelism is
>>>> degraded
>>>>>>> and a
>>>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>>>> thoughts?
>>>>>>>> 
>>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>>>> wangguoz@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> createMessageStreams is used for consuming from specific
>>>> topic(s),
>>>>>>> where
>>>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>>>> parameters;
>>>>>>>>> 
>>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>>>> topics,
>>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>>>> and
>>>>>>> for
>>>>>>>>> each regex matched topic num-threads will be created.
>>>>>>>>> 
>>>>>>>>> The difference between these two are not really for throughput /
>>>>>>> latency,
>>>>>>>>> but rather consumption semantics.
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi team,
>>>>>>>>>> 
>>>>>>>>>> I am comparing the differences between
>>>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>>>> understanding
>>>>>>> is
>>>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>>>> number
>>>>>>>> of
>>>>>>>>>> threads passed in to the method) dedicated to the specified
>>>> topic
>>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>>>> shared
>>>>>>>> by
>>>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>>>> 
>>>>>>>>>> If this is the case I assume createMessageStreams is the
>>>>> preferred
>>>>>>> way
>>>>>>>> to
>>>>>>>>>> create streams for each topic if I have high throughput and low
>>>>>>> latency
>>>>>>>>>> demands. is my assumption correct?
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Regards,
>>>>>>>>>> Tao
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Re: createMessageStreams vs createMessageStreamsByFilter

Posted by Guozhang Wang <wa...@gmail.com>.
Hi James,

What I meant before is that a single fetcher may be responsible for putting
fetched data to multiple queues according to the construction of the
streams setup, where each queue may be consumed by a different thread. And
the queues are actually bounded. Now say if there are two queues that are
getting data from the same fetcher F, and are consumed by two different
user threads A and B. If thread A for some reason got slowed / hung
consuming data from queue 1, then queue 1 will eventually get full, and F
trying to put more data to it will be blocked. Since F is parked on trying
to put data to queue 1, queue 2 will not get more data from it, and thread
B may hence gets starved. Does that make sense now?

Guozhang

On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jc...@tivo.com> wrote:

> Hi,
>
> Sorry to bring up this old thread, but my question is about this exact
> thing:
>
> Guozhang, you said:
> > A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> > partitions.
> >
> > With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> > be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> >
> > With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> > created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> > respectively.
>
>
> You said that in the createMessageStreamsByFilter case, if topic AC had no
> messages in it and consumer.timeout.ms = -1, then the 3 threads might all
> be blocked waiting for data to arrive from topic AC, and so messages from
> BC would not be processed.
>
> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> same problem but just worse. Behind the scenes, is there a single thread
> that is consuming (round-robin?) messages from the different partitions and
> inserting them all into a single queue for the application code to process?
> And that is why a single partition with no messages with block the other
> messages from getting through?
>
> What about createMessageStreams("AC" => 1)? That creates a single stream
> that contains messages from multiple partitions, which might be on
> different brokers. Does that also suffer the same problem, where if one
> partition has no messages, that the application would not receive messages
> from the other paritions?
>
> Thanks,
> -James
>
>
> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > The new consumer will be released in 0.9, which is targeted for end of
> this
> > quarter.
> >
> > On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xi...@gmail.com> wrote:
> >
> >> Do you know when the new consumer API will be publicly available?
> >>
> >> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>> different processes and AC processors gets stuck, hence AC messages
> will
> >>> fill up in the consumer's buffer and eventually prevents the fetcher
> >> thread
> >>> to put more data into it; the fetcher thread will be blocked on that
> and
> >>> not be able to fetch BC.
> >>>
> >>> This issue has been addressed in the new consumer client, which is
> >>> single-threaded with non-blocking APIs.
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xi...@gmail.com>
> wrote:
> >>>
> >>>> Thank you Guozhang for your detailed explanation. In your example
> >>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> among
> >>>> topics there may be situation where all 3 threads threads get stuck
> >> with
> >>>> topic AC e.g. topic is empty which will be holding the connecting
> >> threads
> >>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >> topic
> >>>> BC. do you think this situation will happen?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> I was not clear before .. for createMessageStreamsByFilter each
> >> matched
> >>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>> num-threads created, but each thread will be responsible for fetching
> >>> all
> >>>>> matched topics.
> >>>>>
> >>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>> BC: 6
> >>>>> partitions.
> >>>>>
> >>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> >>> will
> >>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>> respectively;
> >>>>>
> >>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >> will
> >>> be
> >>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>>>> respectively.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xi...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Guozhang,
> >>>>>>
> >>>>>> Do you mean that each regex matched topic owns number of threads
> >> that
> >>>> get
> >>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >> code
> >>>> If
> >>>>> I
> >>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>> have
> >>>>> 3 *
> >>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>
> >>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>
> >>>>>> int threadTotal = 2;
> >>>>>>
> >>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>
> >>>>>>
> >>>>>> But what I observed from the log is different
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>> partitions consumed by consumer thread
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >> kafkatopic-1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>> following
> >>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >> consumers:
> >>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>> partition 1
> >>>>>>
> >>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>> partition 0
> >>>>>>
> >>>>>>
> >>>>>> As you can see from the log there are only 2 threads created and
> >>> shared
> >>>>>> among 3 topics. With this setting I think the parallelism is
> >> degraded
> >>>>> and a
> >>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>> thoughts?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >> wangguoz@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> createMessageStreams is used for consuming from specific
> >> topic(s),
> >>>>> where
> >>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>> parameters;
> >>>>>>>
> >>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>> topics,
> >>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>> and
> >>>>> for
> >>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>
> >>>>>>> The difference between these two are not really for throughput /
> >>>>> latency,
> >>>>>>> but rather consumption semantics.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xi...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi team,
> >>>>>>>>
> >>>>>>>> I am comparing the differences between
> >>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>> understanding
> >>>>> is
> >>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>> number
> >>>>>> of
> >>>>>>>> threads passed in to the method) dedicated to the specified
> >> topic
> >>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>> shared
> >>>>>> by
> >>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>
> >>>>>>>> If this is the case I assume createMessageStreams is the
> >>> preferred
> >>>>> way
> >>>>>> to
> >>>>>>>> create streams for each topic if I have high throughput and low
> >>>>> latency
> >>>>>>>> demands. is my assumption correct?
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang