You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by chandan singh <ck...@gmail.com> on 2017/07/10 13:39:54 UTC

Few observations related to KafkaSpout implementation (1.1.0)

Hi

I hope I am using the right mailing list. Please advice if I am wrong.

I have few observations about the KafkaSpout and feel that some of these
lead to inefficiencies. It will be of great help if someone can throw some
light on the rationale behind the implementation.

1) Kafka polling and committing offsets is done in the spout thread which
is somewhat against the spout best practices. Is simplicity the reason
behind this design? Am I missing something?

2)  Poll-iterate-commit-seek loop seems inefficient in recurrent failure
scenarios. Let say the first massage fails. We will keep polling the same
set of messages at least as many times as that message is retried and
probably more if we are using exponential back-off. Did I misunderstand the
implementation?

Regards
Chandan

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Hugo Da Cruz Louro <hl...@hortonworks.com>.
@chandan 

#1 was designed to be single threaded. There are several reasons for that. 
  1.1 KafkaConsumer is single threaded. If you have multiple threads attempting to poll from it, you will get a ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access”); 
  1.2 Multiple threads are typically only advantageous when one of the threads is expected to have idle time, which typically comes on the form of I/O operations. That is not the case for the Spout. Has @Stig mentioned, Storm already decouples consumption and processing, therefore a Kafka ConsumerRecord is fetched by the spout and in the call to nextTuple is sent downstream. A Spout is Storm’s abstraction to ingest date in the topology. The Bolt is the storm abstraction to let the user plug in arbitrary computation. 

#2 was also designed like that 
  2.1 @Bobby mentioned #2 was designed to optimize for the case where there are no failures. Failures are expected to be rare. If there are lots of failures, something is wrong, e.g. problems in a downstream component/service, or networking problems. When optimizing one weighs tradeoffs. The implementation favors the case when there no failures, which is the most common case. Or put another way, you want to maximize throughput during normal message processing, not when you anticipate that there may be some errors/failures
  2.2 The implementation was designed not to keep any records in memory. A KafkaRecord can have arbitrary size, and in the case multiple records keep on failing, the memory footprint can be increasingly large. Furthermore, if you keep the records in memory, and the JVM running the spout crashes, you will have to poll from Kafka. The Spout already does some optimizations to not emit records that are subsequent to a failed record, but that didn’t fail. Once again we are trying to strike a good balance between throughput, guarantee of delivery, and minimizing duplicates. 

Best,
Hugo

> On Jul 10, 2017, at 11:40 AM, Stig Rohde Døssing <st...@gmail.com> wrote:
> 
> 1) It is true that nextTuple implementations should try to avoid blocking
> as much as possible, but the time nextTuple may block in poll is capped by
> a configuration parameter
> https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L304.
> 
> 
> The guideline on the concepts page is (I think) intended to convey that
> implementations should avoid long or indefinite duration blocking in
> nextTuple. The concern here is that Storm can't call ack or fail or any
> other spout method while the spout thread is executing nextTuple. This can
> break tuple timeouts, because an acked tuple sitting in the spout's queue
> can still end up timing out if the spout thread is blocked in nextTuple. It
> is also preferable if we can avoid the spout's input queue getting full. As
> long as the call to nextTuple returns within a reasonable timeframe,
> blocking won't cause these problems.
> 
> For example, it would be a bad idea to implement a spout that blocks in
> nextTuple until more messages are available, because it would mean that
> acked tuples can end up sitting there waiting for nextTuple to return
> before they are properly acked, which could cause them to time out
> erroneously. If the spout instead only blocks for a very short period, this
> problem is avoided.
> 
> Very short duration blocking is also what Storm itself will do by default
> if nextTuple returns no new tuples. This avoids excessive calls to
> nextTuple wasting a bunch of power and heat if there are no new tuples to
> emit
> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java#L36.
> 
> 
> If you're interested in the details of how this works, take a look at
> https://github.com/apache/storm/blob/d7c781891fd0cf409335ed1b70397e3c6747475e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L139
> to see the loop Storm runs for spout components.
> 
> The only potentially long block we have in nextTuple is the commit
> operation, which seems reasonably innocent to me. I'm not aware of any
> reason that call would block for a long time, unless there's a problem with
> the connection to Kafka. I'd be happy to be corrected, but I can't think of
> any situation where commit blocks for a long time, but we would benefit
> from not blocking the spout thread.
> 
> Regarding decoupling consumption and processing, this is already the case.
> Consumption is the spout's responsibility, while processing is handled by
> the rest of the topology (the bolts). The spout polls Kafka and puts the
> messages in the shared topology queue by emitting them. The bolts then
> process the messages. Since the bolts run in different threads than the
> spouts, consumption and processing is already decoupled.
> 
> About how prefetching works with seeking, I haven't looked into how
> prefetching works when we seek to a new offset, but I'd just like to
> correct you a bit: We don't seek before every poll. We only seek if there
> are failed tuples ready for retry, and then only on the partitions
> containing those tuples. Most calls to poll will happen with no preceding
> seek.
> 
> 2017-07-10 19:13 GMT+02:00 chandan singh <ck...@gmail.com>:
> 
>> Hi Stig & Bobby
>> 
>> Thanks for confirming my understanding.
>> 
>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
>> has been a guideline on http://storm.apache.org/
>> releases/1.1.0/Concepts.html
>> for long. Copying verbatim here : "The main method on spouts is nextTuple.
>> nextTuple either emits a new tuple into the topology or simply returns if
>> there are no new tuples to emit. It is imperative that nextTuple does not
>> block for any spout implementation, because Storm calls all the spout
>> methods on the same thread." I admit that there is some chance my
>> interpretation is partially incorrect but I have been following it in a
>> custom spout till now. Even though the objective is different, there is a
>> similar hint on Kafka official documentation. Please see under heading "2.
>> Decouple Consumption and Processing" on
>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
>> kafka/clients/consumer/KafkaConsumer.html.
>> Essentially, a thread polls Kafka and spout thread gets the messages
>> through a shared queue. If pre-fetching is present in Kafka (I will read
>> about it further), I assume we do not have to fetch in another thread but I
>> am not sure how does the pre-fetching behave with re-seeking before every
>> poll.
>> 
>> 2) @Bobby, you are correct in pointing what needs to be optimized but the
>> facts, sometimes, prevent us from making assumptions. We do optimize our
>> retry loop such that we don't poll the messages again. I especially see
>> problems when combined with exponential back off.  I am not sure how
>> difficult or clean will it be to expose some sort of configuration to allow
>> such optimization?  Do you think it will be worth trying out something?
>> 
>> Thanks
>> Chandan
>> 


Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Stig Rohde Døssing <st...@gmail.com>.
1) It is true that nextTuple implementations should try to avoid blocking
as much as possible, but the time nextTuple may block in poll is capped by
a configuration parameter
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L304.


The guideline on the concepts page is (I think) intended to convey that
implementations should avoid long or indefinite duration blocking in
nextTuple. The concern here is that Storm can't call ack or fail or any
other spout method while the spout thread is executing nextTuple. This can
break tuple timeouts, because an acked tuple sitting in the spout's queue
can still end up timing out if the spout thread is blocked in nextTuple. It
is also preferable if we can avoid the spout's input queue getting full. As
long as the call to nextTuple returns within a reasonable timeframe,
blocking won't cause these problems.

For example, it would be a bad idea to implement a spout that blocks in
nextTuple until more messages are available, because it would mean that
acked tuples can end up sitting there waiting for nextTuple to return
before they are properly acked, which could cause them to time out
erroneously. If the spout instead only blocks for a very short period, this
problem is avoided.

Very short duration blocking is also what Storm itself will do by default
if nextTuple returns no new tuples. This avoids excessive calls to
nextTuple wasting a bunch of power and heat if there are no new tuples to
emit
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java#L36.


If you're interested in the details of how this works, take a look at
https://github.com/apache/storm/blob/d7c781891fd0cf409335ed1b70397e3c6747475e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L139
to see the loop Storm runs for spout components.

The only potentially long block we have in nextTuple is the commit
operation, which seems reasonably innocent to me. I'm not aware of any
reason that call would block for a long time, unless there's a problem with
the connection to Kafka. I'd be happy to be corrected, but I can't think of
any situation where commit blocks for a long time, but we would benefit
from not blocking the spout thread.

Regarding decoupling consumption and processing, this is already the case.
Consumption is the spout's responsibility, while processing is handled by
the rest of the topology (the bolts). The spout polls Kafka and puts the
messages in the shared topology queue by emitting them. The bolts then
process the messages. Since the bolts run in different threads than the
spouts, consumption and processing is already decoupled.

About how prefetching works with seeking, I haven't looked into how
prefetching works when we seek to a new offset, but I'd just like to
correct you a bit: We don't seek before every poll. We only seek if there
are failed tuples ready for retry, and then only on the partitions
containing those tuples. Most calls to poll will happen with no preceding
seek.

2017-07-10 19:13 GMT+02:00 chandan singh <ck...@gmail.com>:

> Hi Stig & Bobby
>
> Thanks for confirming my understanding.
>
> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
> has been a guideline on http://storm.apache.org/
> releases/1.1.0/Concepts.html
> for long. Copying verbatim here : "The main method on spouts is nextTuple.
> nextTuple either emits a new tuple into the topology or simply returns if
> there are no new tuples to emit. It is imperative that nextTuple does not
> block for any spout implementation, because Storm calls all the spout
> methods on the same thread." I admit that there is some chance my
> interpretation is partially incorrect but I have been following it in a
> custom spout till now. Even though the objective is different, there is a
> similar hint on Kafka official documentation. Please see under heading "2.
> Decouple Consumption and Processing" on
> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html.
> Essentially, a thread polls Kafka and spout thread gets the messages
> through a shared queue. If pre-fetching is present in Kafka (I will read
> about it further), I assume we do not have to fetch in another thread but I
> am not sure how does the pre-fetching behave with re-seeking before every
> poll.
>
> 2) @Bobby, you are correct in pointing what needs to be optimized but the
> facts, sometimes, prevent us from making assumptions. We do optimize our
> retry loop such that we don't poll the messages again. I especially see
> problems when combined with exponential back off.  I am not sure how
> difficult or clean will it be to expose some sort of configuration to allow
> such optimization?  Do you think it will be worth trying out something?
>
> Thanks
> Chandan
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
I believe that "better" is relative most of the time.  Optimizing a failure case costs something, unless of course the code is doing something horrible and there truly is a better way to do it that uses less memory, cpu, etc.  If so please put up a pull request and lets fix the thing.  However most of the time getting better performance costs something.  If we want to avoid going back to kafka for a failed tuple then we need to store it somewhere.  The simplest way to do it would be to cache the tuple in memory, but that will cost as much memory as all of the outstanding tuples for the spout.  It is not a clear win if failures are not common.
How frequently should failures happen?  I don't think we have anything like that documented, because it depends on a number of things, but most of the time it indicates that there is something that needs to be fixed about the topology.  For me if I see any failures in a topology and we are not in the middle of a rolling upgrade I am concerned.  A lot of teams ignore errors because they are running a lambda architecture and will have batch stomp on the top of it later, but I still get very nervous when I see it.

- Bobby


On Monday, July 10, 2017, 2:33:52 PM CDT, chandan singh <ck...@gmail.com> wrote:

Sorry about being cryptic there. What I meant is that it will be much
better if we don't make assumptions about frequency of failure rates in
topologies. I know it is more of a commonsense but out of curiosity, can
you point me to any Storm documentation which makes a comment on preferable
failure rates. I was suggesting if we can offer the user an optimization
through clean API, the user will be free to decide on the rationale of
using it.

On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

> I'm not sure what assumptions you want to make that this is preventing, or
> why they would be helpful.
>
> - Bobby
>
>
> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> cks071g2@gmail.com> wrote:
>
> Hi Stig & Bobby
>
> Thanks for confirming my understanding.
>
> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
> has been a guideline on http://storm.apache.org/
> releases/1.1.0/Concepts.html
> for long. Copying verbatim here : "The main method on spouts is nextTuple.
> nextTuple either emits a new tuple into the topology or simply returns if
> there are no new tuples to emit. It is imperative that nextTuple does not
> block for any spout implementation, because Storm calls all the spout
> methods on the same thread." I admit that there is some chance my
> interpretation is partially incorrect but I have been following it in a
> custom spout till now. Even though the objective is different, there is a
> similar hint on Kafka official documentation. Please see under heading "2.
> Decouple Consumption and Processing" on
> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html.
> Essentially, a thread polls Kafka and spout thread gets the messages
> through a shared queue. If pre-fetching is present in Kafka (I will read
> about it further), I assume we do not have to fetch in another thread but I
> am not sure how does the pre-fetching behave with re-seeking before every
> poll.
>
> 2) @Bobby, you are correct in pointing what needs to be optimized but the
> facts, sometimes, prevent us from making assumptions. We do optimize our
> retry loop such that we don't poll the messages again. I especially see
> problems when combined with exponential back off.  I am not sure how
> difficult or clean will it be to expose some sort of configuration to allow
> such optimization?  Do you think it will be worth trying out something?
>
> Thanks
> Chandan
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Roshan Naik <ro...@hortonworks.com>.
There is a set of simple topologies in storm-perf for such benchmarking… we have one there that measures perf of the old KafkaSpout :
https://github.com/apache/storm/blob/1.x-branch/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java

We don’t have one yet for the new Kafka spout… I had written one that never got around to contributing back….
https://github.com/roshannaik/storm/blob/perftopos1.x/examples/storm-starter/src/jvm/org/apache/storm/starter/perf/NewKafkaSpoutNullBoltTopo.java

You can copy it into the storm-perf and use that as your starting point… the main method that submits the topo there needs to be made more like the ones we already have in official Storm.

-roshan

On 7/10/17, 10:49 PM, "chandan singh" <ck...@gmail.com> wrote:

    Thanks @Stig for the detailed explanation. I have not yet used the
    KafkaSpout; was just going through the code to understand it. I will try
    the fixes too. In the topologies I run, I hardly create a scenario where
    the Spout is limiting the throughput. Still, I will try to do some bench
    marking by using a dummy topology and update you on my experience.
    
    Thanks a lot again.
    Chandan
    
    On Tue, Jul 11, 2017 at 3:46 AM, Stig Døssing <ge...@gmail.com>
    wrote:
    
    > > It is the spout thread which
    > > is polling (consuming) and iterating (processing) over the polled
    > messages.
    > > If we separate the consumption in another thread and push messages in a
    > > queue, iterating (processing) is now concurrent and decoupled
    >
    > Sure, but the bookkeeping done by the spout before emitting the polled
    > messages should be very lightweight, and pushing the messages (and
    > returning acks/fails) through another queue system isn't free. I'm just not
    > convinced that avoiding short duration blocks by running the consumer in a
    > separate thread has any benefit, but I'd be happy to see benchmarks.
    >
    > The auto spout block/wait when no messages are emitted makes absolute sense
    > > but a different scenario, less likely in a mostly loaded topology.
    >
    > Keep in mind that the consumer blocking on poll is also less likely in a
    > loaded topology, because the block only happens if either Kafka has no more
    > new messages, or the consumer prefetching fails to fetch messages quickly
    > enough.
    >
    > 1.2 Agreed again. I had taken the spout recommendation about non-blocking
    > > calls seriously, taking into account its critical role
    >
    > I think it's a good recommendation, but like I said I understand it to be
    > warning about blocking for long periods. Besides, the user can configure
    > how long of a block in poll they will accept for the KafkaSpout, so if the
    > current default timeout (200ms) is causing someone issues, it can always be
    > lowered.
    >
    > I am especially thinking of the failure case
    > > which, for some reason, is recurrent;  the same message which does not
    > get
    > > acked.
    >
    > This case is something the user needs to deal with regardless of how we
    > implement the spout. If a message is repeatedly failing, the choice will be
    > between repeatedly retrying that message until it finally succeeds, or
    > dropping that message and moving on. Either option is something the user
    > can configure via the RetryService.
    >
    > We cannot commit unless
    > > that failed message is finally acked after few retries. Due to which, we
    > > don't even process any new set of records.
    > >
    > This behavior is caused by a bug in 1.1.0, which has since been fixed. Once
    > the failed message is fetched and retried, the spout should proceed past
    > the failed message and emit new tuples. The only reasons the spout should
    > stop emitting new tuples are if it hits the cap on uncommitted offsets
    > (configurable, see
    > https://github.com/apache/storm/blob/master/external/
    > storm-kafka-client/src/main/java/org/apache/storm/kafka/
    > spout/KafkaSpoutConfig.java#L47),
    > or if the failed message is so far back in the message stream that the
    > spout seeks back to the failed message, and then spends a long time
    > fetching and discarding already emitted tuples while catching up to where
    > it left off.
    >
    > There are bugs in the max uncommitted offsets mechanism right now that can
    > cause a spout to stop retrying tuples (some fixed here
    > https://issues.apache.org/jira/browse/STORM-2343, some still pending
    > https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
    > this). I think the behavior you're seeing is caused by part of STORM-2343,
    > which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
    > tuple, the spout would always seek to the committed offset for that
    > partition in order to get the failed tuple. This caused some issues
    > relating to being unable to retry messages that were too far from the
    > committed offset, and unnecessarily polling the messages next to the
    > committed offset when we didn't need to. We've changed it so we seek to the
    > offset we want to retry instead, which avoids these issues. See
    > https://github.com/apache/storm/pull/1924/files#diff-
    > 7d7cbc8f5444fa7ada7962033fc31c5eR302
    > for details. You may want to try checking out 1.x-branch and building the
    > spout yourself to see if this solves your issue.
    >
    > 2017-07-10 23:08 GMT+02:00 chandan singh <ck...@gmail.com>:
    >
    > > Hi Hugo
    > >
    > > Hope I do not come across as arguing for its own sake.
    > >
    > > 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
    > > threads. We can use a single thread which is different from spout thread.
    > > That thread and spout will share a queue of ConsumerRecords.
    > >
    > > 1.2 Agreed again. I had taken the spout recommendation about non-blocking
    > > calls seriously, taking into account its critical role. I still believe
    > it
    > > is a sound recommendation but may be there are other factors like
    > topology
    > > throughput which might be more limiting than the spout making a Kafka
    > > consumer poll.
    > >
    > > 2.1 There is no room for an optimization which effects normal
    > (non-failure)
    > > case negatively. I was just suggesting if we can optimize the failure
    > case
    > > without a trade-off. Failure processing is quite independent of
    > > successfully acked messages. I am especially thinking of the failure case
    > > which, for some reason, is recurrent;  the same message which does not
    > get
    > > acked. I agree that it is much better to address the issue in the
    > > application.
    > >
    > > 2.2 As you have correctly mentioned, subsequently polled messages
    > > (previously acked/emitted) are no longer emitted. We cannot commit unless
    > > that failed message is finally acked after few retries. Due to which, we
    > > don't even process any new set of records.  We anyway keep the current
    > set
    > > of messages (polled repeatedly) in memory.  All we are doing is polling
    > the
    > > same set of messages and iterating till that one failure is rescheduled
    > for
    > > N times (worse with exponential back off).
    > >
    > > Anyway, we are dealing with rare scenarios which should get the least
    > > priority especially if it introduces complexity.
    > >
    > > On a different note, sending arbitrarily (large) sized messages to Kafka
    > is
    > > a problem in itself.
    > >
    > > @Stig I will think about it further and evaluate the need for
    > optimization.
    > >
    > > Thanks
    > > Chandan
    > >
    > > On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <ck...@gmail.com>
    > wrote:
    > >
    > > > 1) Agreed that short duration blocks are less likely to cause an issue
    > > but
    > > > it will be much better to avoid them. Anyway, a conclusion is not easy
    > > > without some benchmark. I will let You know if I am able to do some
    > > volume
    > > > testing on both options and observe significant difference.
    > > >
    > > > The auto spout block/wait when no messages are emitted makes absolute
    > > > sense but a different scenario, less likely in a mostly loaded
    > topology.
    > > >
    > > > You are absolutely correct about decoupling the consumption from
    > > > processing when you see it from the perspective of Storm. I think there
    > > is
    > > > a very subtle difference when we keep Kafka in mind. It is the spout
    > > thread
    > > > which is polling (consuming) and iterating (processing) over the polled
    > > > messages. If we separate the consumption in another thread and push
    > > > messages in a queue, iterating (processing) is now concurrent and
    > > > decoupled. Anyway, I too feel there should not be any significant
    > > > difference in throughput but it will be interesting to measure the
    > > > difference.
    > > >
    > > > I went overboard while mentioning seek before every poll when I had the
    > > > failure scenario in mind.
    > > >
    > > > Thanks a lot. I will keep you guys updated on the switch from custom
    > > spout
    > > > to KafkaSpout.
    > > >
    > > > Thanks for the amazing work.
    > > >
    > > > Chandan
    > > >
    > > > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <ck...@gmail.com>
    > > > wrote:
    > > >
    > > >> Sorry about being cryptic there. What I meant is that it will be much
    > > >> better if we don't make assumptions about frequency of failure rates
    > in
    > > >> topologies. I know it is more of a commonsense but out of curiosity,
    > can
    > > >> you point me to any Storm documentation which makes a comment on
    > > preferable
    > > >> failure rates. I was suggesting if we can offer the user an
    > optimization
    > > >> through clean API, the user will be free to decide on the rationale of
    > > >> using it.
    > > >>
    > > >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
    > > >> evans@yahoo-inc.com.invalid> wrote:
    > > >>
    > > >>> I'm not sure what assumptions you want to make that this is
    > preventing,
    > > >>> or why they would be helpful.
    > > >>>
    > > >>> - Bobby
    > > >>>
    > > >>>
    > > >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
    > > >>> cks071g2@gmail.com> wrote:
    > > >>>
    > > >>> Hi Stig & Bobby
    > > >>>
    > > >>> Thanks for confirming my understanding.
    > > >>>
    > > >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
    > > non-blocking
    > > >>> has been a guideline on http://storm.apache.org/releas
    > > >>> es/1.1.0/Concepts.html
    > > >>> for long. Copying verbatim here : "The main method on spouts is
    > > >>> nextTuple.
    > > >>> nextTuple either emits a new tuple into the topology or simply
    > returns
    > > if
    > > >>> there are no new tuples to emit. It is imperative that nextTuple does
    > > not
    > > >>> block for any spout implementation, because Storm calls all the spout
    > > >>> methods on the same thread." I admit that there is some chance my
    > > >>> interpretation is partially incorrect but I have been following it
    > in a
    > > >>> custom spout till now. Even though the objective is different, there
    > > is a
    > > >>> similar hint on Kafka official documentation. Please see under
    > heading
    > > >>> "2.
    > > >>> Decouple Consumption and Processing" on
    > > >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
    > > >>> kafka/clients/consumer/KafkaConsumer.html.
    > > >>> Essentially, a thread polls Kafka and spout thread gets the messages
    > > >>> through a shared queue. If pre-fetching is present in Kafka (I will
    > > read
    > > >>> about it further), I assume we do not have to fetch in another thread
    > > >>> but I
    > > >>> am not sure how does the pre-fetching behave with re-seeking before
    > > every
    > > >>> poll.
    > > >>>
    > > >>> 2) @Bobby, you are correct in pointing what needs to be optimized but
    > > the
    > > >>> facts, sometimes, prevent us from making assumptions. We do optimize
    > > our
    > > >>> retry loop such that we don't poll the messages again. I especially
    > see
    > > >>> problems when combined with exponential back off.  I am not sure how
    > > >>> difficult or clean will it be to expose some sort of configuration to
    > > >>> allow
    > > >>> such optimization?  Do you think it will be worth trying out
    > something?
    > > >>>
    > > >>> Thanks
    > > >>> Chandan
    > > >>>
    > > >>
    > > >>
    > > >
    > >
    >
    


Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by chandan singh <ck...@gmail.com>.
Thanks @Stig for the detailed explanation. I have not yet used the
KafkaSpout; was just going through the code to understand it. I will try
the fixes too. In the topologies I run, I hardly create a scenario where
the Spout is limiting the throughput. Still, I will try to do some bench
marking by using a dummy topology and update you on my experience.

Thanks a lot again.
Chandan

On Tue, Jul 11, 2017 at 3:46 AM, Stig Døssing <ge...@gmail.com>
wrote:

> > It is the spout thread which
> > is polling (consuming) and iterating (processing) over the polled
> messages.
> > If we separate the consumption in another thread and push messages in a
> > queue, iterating (processing) is now concurrent and decoupled
>
> Sure, but the bookkeeping done by the spout before emitting the polled
> messages should be very lightweight, and pushing the messages (and
> returning acks/fails) through another queue system isn't free. I'm just not
> convinced that avoiding short duration blocks by running the consumer in a
> separate thread has any benefit, but I'd be happy to see benchmarks.
>
> The auto spout block/wait when no messages are emitted makes absolute sense
> > but a different scenario, less likely in a mostly loaded topology.
>
> Keep in mind that the consumer blocking on poll is also less likely in a
> loaded topology, because the block only happens if either Kafka has no more
> new messages, or the consumer prefetching fails to fetch messages quickly
> enough.
>
> 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> > calls seriously, taking into account its critical role
>
> I think it's a good recommendation, but like I said I understand it to be
> warning about blocking for long periods. Besides, the user can configure
> how long of a block in poll they will accept for the KafkaSpout, so if the
> current default timeout (200ms) is causing someone issues, it can always be
> lowered.
>
> I am especially thinking of the failure case
> > which, for some reason, is recurrent;  the same message which does not
> get
> > acked.
>
> This case is something the user needs to deal with regardless of how we
> implement the spout. If a message is repeatedly failing, the choice will be
> between repeatedly retrying that message until it finally succeeds, or
> dropping that message and moving on. Either option is something the user
> can configure via the RetryService.
>
> We cannot commit unless
> > that failed message is finally acked after few retries. Due to which, we
> > don't even process any new set of records.
> >
> This behavior is caused by a bug in 1.1.0, which has since been fixed. Once
> the failed message is fetched and retried, the spout should proceed past
> the failed message and emit new tuples. The only reasons the spout should
> stop emitting new tuples are if it hits the cap on uncommitted offsets
> (configurable, see
> https://github.com/apache/storm/blob/master/external/
> storm-kafka-client/src/main/java/org/apache/storm/kafka/
> spout/KafkaSpoutConfig.java#L47),
> or if the failed message is so far back in the message stream that the
> spout seeks back to the failed message, and then spends a long time
> fetching and discarding already emitted tuples while catching up to where
> it left off.
>
> There are bugs in the max uncommitted offsets mechanism right now that can
> cause a spout to stop retrying tuples (some fixed here
> https://issues.apache.org/jira/browse/STORM-2343, some still pending
> https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
> this). I think the behavior you're seeing is caused by part of STORM-2343,
> which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
> tuple, the spout would always seek to the committed offset for that
> partition in order to get the failed tuple. This caused some issues
> relating to being unable to retry messages that were too far from the
> committed offset, and unnecessarily polling the messages next to the
> committed offset when we didn't need to. We've changed it so we seek to the
> offset we want to retry instead, which avoids these issues. See
> https://github.com/apache/storm/pull/1924/files#diff-
> 7d7cbc8f5444fa7ada7962033fc31c5eR302
> for details. You may want to try checking out 1.x-branch and building the
> spout yourself to see if this solves your issue.
>
> 2017-07-10 23:08 GMT+02:00 chandan singh <ck...@gmail.com>:
>
> > Hi Hugo
> >
> > Hope I do not come across as arguing for its own sake.
> >
> > 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
> > threads. We can use a single thread which is different from spout thread.
> > That thread and spout will share a queue of ConsumerRecords.
> >
> > 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> > calls seriously, taking into account its critical role. I still believe
> it
> > is a sound recommendation but may be there are other factors like
> topology
> > throughput which might be more limiting than the spout making a Kafka
> > consumer poll.
> >
> > 2.1 There is no room for an optimization which effects normal
> (non-failure)
> > case negatively. I was just suggesting if we can optimize the failure
> case
> > without a trade-off. Failure processing is quite independent of
> > successfully acked messages. I am especially thinking of the failure case
> > which, for some reason, is recurrent;  the same message which does not
> get
> > acked. I agree that it is much better to address the issue in the
> > application.
> >
> > 2.2 As you have correctly mentioned, subsequently polled messages
> > (previously acked/emitted) are no longer emitted. We cannot commit unless
> > that failed message is finally acked after few retries. Due to which, we
> > don't even process any new set of records.  We anyway keep the current
> set
> > of messages (polled repeatedly) in memory.  All we are doing is polling
> the
> > same set of messages and iterating till that one failure is rescheduled
> for
> > N times (worse with exponential back off).
> >
> > Anyway, we are dealing with rare scenarios which should get the least
> > priority especially if it introduces complexity.
> >
> > On a different note, sending arbitrarily (large) sized messages to Kafka
> is
> > a problem in itself.
> >
> > @Stig I will think about it further and evaluate the need for
> optimization.
> >
> > Thanks
> > Chandan
> >
> > On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <ck...@gmail.com>
> wrote:
> >
> > > 1) Agreed that short duration blocks are less likely to cause an issue
> > but
> > > it will be much better to avoid them. Anyway, a conclusion is not easy
> > > without some benchmark. I will let You know if I am able to do some
> > volume
> > > testing on both options and observe significant difference.
> > >
> > > The auto spout block/wait when no messages are emitted makes absolute
> > > sense but a different scenario, less likely in a mostly loaded
> topology.
> > >
> > > You are absolutely correct about decoupling the consumption from
> > > processing when you see it from the perspective of Storm. I think there
> > is
> > > a very subtle difference when we keep Kafka in mind. It is the spout
> > thread
> > > which is polling (consuming) and iterating (processing) over the polled
> > > messages. If we separate the consumption in another thread and push
> > > messages in a queue, iterating (processing) is now concurrent and
> > > decoupled. Anyway, I too feel there should not be any significant
> > > difference in throughput but it will be interesting to measure the
> > > difference.
> > >
> > > I went overboard while mentioning seek before every poll when I had the
> > > failure scenario in mind.
> > >
> > > Thanks a lot. I will keep you guys updated on the switch from custom
> > spout
> > > to KafkaSpout.
> > >
> > > Thanks for the amazing work.
> > >
> > > Chandan
> > >
> > > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <ck...@gmail.com>
> > > wrote:
> > >
> > >> Sorry about being cryptic there. What I meant is that it will be much
> > >> better if we don't make assumptions about frequency of failure rates
> in
> > >> topologies. I know it is more of a commonsense but out of curiosity,
> can
> > >> you point me to any Storm documentation which makes a comment on
> > preferable
> > >> failure rates. I was suggesting if we can offer the user an
> optimization
> > >> through clean API, the user will be free to decide on the rationale of
> > >> using it.
> > >>
> > >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
> > >> evans@yahoo-inc.com.invalid> wrote:
> > >>
> > >>> I'm not sure what assumptions you want to make that this is
> preventing,
> > >>> or why they would be helpful.
> > >>>
> > >>> - Bobby
> > >>>
> > >>>
> > >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> > >>> cks071g2@gmail.com> wrote:
> > >>>
> > >>> Hi Stig & Bobby
> > >>>
> > >>> Thanks for confirming my understanding.
> > >>>
> > >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
> > non-blocking
> > >>> has been a guideline on http://storm.apache.org/releas
> > >>> es/1.1.0/Concepts.html
> > >>> for long. Copying verbatim here : "The main method on spouts is
> > >>> nextTuple.
> > >>> nextTuple either emits a new tuple into the topology or simply
> returns
> > if
> > >>> there are no new tuples to emit. It is imperative that nextTuple does
> > not
> > >>> block for any spout implementation, because Storm calls all the spout
> > >>> methods on the same thread." I admit that there is some chance my
> > >>> interpretation is partially incorrect but I have been following it
> in a
> > >>> custom spout till now. Even though the objective is different, there
> > is a
> > >>> similar hint on Kafka official documentation. Please see under
> heading
> > >>> "2.
> > >>> Decouple Consumption and Processing" on
> > >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> > >>> kafka/clients/consumer/KafkaConsumer.html.
> > >>> Essentially, a thread polls Kafka and spout thread gets the messages
> > >>> through a shared queue. If pre-fetching is present in Kafka (I will
> > read
> > >>> about it further), I assume we do not have to fetch in another thread
> > >>> but I
> > >>> am not sure how does the pre-fetching behave with re-seeking before
> > every
> > >>> poll.
> > >>>
> > >>> 2) @Bobby, you are correct in pointing what needs to be optimized but
> > the
> > >>> facts, sometimes, prevent us from making assumptions. We do optimize
> > our
> > >>> retry loop such that we don't poll the messages again. I especially
> see
> > >>> problems when combined with exponential back off.  I am not sure how
> > >>> difficult or clean will it be to expose some sort of configuration to
> > >>> allow
> > >>> such optimization?  Do you think it will be worth trying out
> something?
> > >>>
> > >>> Thanks
> > >>> Chandan
> > >>>
> > >>
> > >>
> > >
> >
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Stig Døssing <ge...@gmail.com>.
> It is the spout thread which
> is polling (consuming) and iterating (processing) over the polled messages.
> If we separate the consumption in another thread and push messages in a
> queue, iterating (processing) is now concurrent and decoupled

Sure, but the bookkeeping done by the spout before emitting the polled
messages should be very lightweight, and pushing the messages (and
returning acks/fails) through another queue system isn't free. I'm just not
convinced that avoiding short duration blocks by running the consumer in a
separate thread has any benefit, but I'd be happy to see benchmarks.

The auto spout block/wait when no messages are emitted makes absolute sense
> but a different scenario, less likely in a mostly loaded topology.

Keep in mind that the consumer blocking on poll is also less likely in a
loaded topology, because the block only happens if either Kafka has no more
new messages, or the consumer prefetching fails to fetch messages quickly
enough.

1.2 Agreed again. I had taken the spout recommendation about non-blocking
> calls seriously, taking into account its critical role

I think it's a good recommendation, but like I said I understand it to be
warning about blocking for long periods. Besides, the user can configure
how long of a block in poll they will accept for the KafkaSpout, so if the
current default timeout (200ms) is causing someone issues, it can always be
lowered.

I am especially thinking of the failure case
> which, for some reason, is recurrent;  the same message which does not get
> acked.

This case is something the user needs to deal with regardless of how we
implement the spout. If a message is repeatedly failing, the choice will be
between repeatedly retrying that message until it finally succeeds, or
dropping that message and moving on. Either option is something the user
can configure via the RetryService.

We cannot commit unless
> that failed message is finally acked after few retries. Due to which, we
> don't even process any new set of records.
>
This behavior is caused by a bug in 1.1.0, which has since been fixed. Once
the failed message is fetched and retried, the spout should proceed past
the failed message and emit new tuples. The only reasons the spout should
stop emitting new tuples are if it hits the cap on uncommitted offsets
(configurable, see
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L47),
or if the failed message is so far back in the message stream that the
spout seeks back to the failed message, and then spends a long time
fetching and discarding already emitted tuples while catching up to where
it left off.

There are bugs in the max uncommitted offsets mechanism right now that can
cause a spout to stop retrying tuples (some fixed here
https://issues.apache.org/jira/browse/STORM-2343, some still pending
https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
this). I think the behavior you're seeing is caused by part of STORM-2343,
which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
tuple, the spout would always seek to the committed offset for that
partition in order to get the failed tuple. This caused some issues
relating to being unable to retry messages that were too far from the
committed offset, and unnecessarily polling the messages next to the
committed offset when we didn't need to. We've changed it so we seek to the
offset we want to retry instead, which avoids these issues. See
https://github.com/apache/storm/pull/1924/files#diff-7d7cbc8f5444fa7ada7962033fc31c5eR302
for details. You may want to try checking out 1.x-branch and building the
spout yourself to see if this solves your issue.

2017-07-10 23:08 GMT+02:00 chandan singh <ck...@gmail.com>:

> Hi Hugo
>
> Hope I do not come across as arguing for its own sake.
>
> 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
> threads. We can use a single thread which is different from spout thread.
> That thread and spout will share a queue of ConsumerRecords.
>
> 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> calls seriously, taking into account its critical role. I still believe it
> is a sound recommendation but may be there are other factors like topology
> throughput which might be more limiting than the spout making a Kafka
> consumer poll.
>
> 2.1 There is no room for an optimization which effects normal (non-failure)
> case negatively. I was just suggesting if we can optimize the failure case
> without a trade-off. Failure processing is quite independent of
> successfully acked messages. I am especially thinking of the failure case
> which, for some reason, is recurrent;  the same message which does not get
> acked. I agree that it is much better to address the issue in the
> application.
>
> 2.2 As you have correctly mentioned, subsequently polled messages
> (previously acked/emitted) are no longer emitted. We cannot commit unless
> that failed message is finally acked after few retries. Due to which, we
> don't even process any new set of records.  We anyway keep the current set
> of messages (polled repeatedly) in memory.  All we are doing is polling the
> same set of messages and iterating till that one failure is rescheduled for
> N times (worse with exponential back off).
>
> Anyway, we are dealing with rare scenarios which should get the least
> priority especially if it introduces complexity.
>
> On a different note, sending arbitrarily (large) sized messages to Kafka is
> a problem in itself.
>
> @Stig I will think about it further and evaluate the need for optimization.
>
> Thanks
> Chandan
>
> On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <ck...@gmail.com> wrote:
>
> > 1) Agreed that short duration blocks are less likely to cause an issue
> but
> > it will be much better to avoid them. Anyway, a conclusion is not easy
> > without some benchmark. I will let You know if I am able to do some
> volume
> > testing on both options and observe significant difference.
> >
> > The auto spout block/wait when no messages are emitted makes absolute
> > sense but a different scenario, less likely in a mostly loaded topology.
> >
> > You are absolutely correct about decoupling the consumption from
> > processing when you see it from the perspective of Storm. I think there
> is
> > a very subtle difference when we keep Kafka in mind. It is the spout
> thread
> > which is polling (consuming) and iterating (processing) over the polled
> > messages. If we separate the consumption in another thread and push
> > messages in a queue, iterating (processing) is now concurrent and
> > decoupled. Anyway, I too feel there should not be any significant
> > difference in throughput but it will be interesting to measure the
> > difference.
> >
> > I went overboard while mentioning seek before every poll when I had the
> > failure scenario in mind.
> >
> > Thanks a lot. I will keep you guys updated on the switch from custom
> spout
> > to KafkaSpout.
> >
> > Thanks for the amazing work.
> >
> > Chandan
> >
> > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <ck...@gmail.com>
> > wrote:
> >
> >> Sorry about being cryptic there. What I meant is that it will be much
> >> better if we don't make assumptions about frequency of failure rates in
> >> topologies. I know it is more of a commonsense but out of curiosity, can
> >> you point me to any Storm documentation which makes a comment on
> preferable
> >> failure rates. I was suggesting if we can offer the user an optimization
> >> through clean API, the user will be free to decide on the rationale of
> >> using it.
> >>
> >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
> >> evans@yahoo-inc.com.invalid> wrote:
> >>
> >>> I'm not sure what assumptions you want to make that this is preventing,
> >>> or why they would be helpful.
> >>>
> >>> - Bobby
> >>>
> >>>
> >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> >>> cks071g2@gmail.com> wrote:
> >>>
> >>> Hi Stig & Bobby
> >>>
> >>> Thanks for confirming my understanding.
> >>>
> >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
> non-blocking
> >>> has been a guideline on http://storm.apache.org/releas
> >>> es/1.1.0/Concepts.html
> >>> for long. Copying verbatim here : "The main method on spouts is
> >>> nextTuple.
> >>> nextTuple either emits a new tuple into the topology or simply returns
> if
> >>> there are no new tuples to emit. It is imperative that nextTuple does
> not
> >>> block for any spout implementation, because Storm calls all the spout
> >>> methods on the same thread." I admit that there is some chance my
> >>> interpretation is partially incorrect but I have been following it in a
> >>> custom spout till now. Even though the objective is different, there
> is a
> >>> similar hint on Kafka official documentation. Please see under heading
> >>> "2.
> >>> Decouple Consumption and Processing" on
> >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> >>> kafka/clients/consumer/KafkaConsumer.html.
> >>> Essentially, a thread polls Kafka and spout thread gets the messages
> >>> through a shared queue. If pre-fetching is present in Kafka (I will
> read
> >>> about it further), I assume we do not have to fetch in another thread
> >>> but I
> >>> am not sure how does the pre-fetching behave with re-seeking before
> every
> >>> poll.
> >>>
> >>> 2) @Bobby, you are correct in pointing what needs to be optimized but
> the
> >>> facts, sometimes, prevent us from making assumptions. We do optimize
> our
> >>> retry loop such that we don't poll the messages again. I especially see
> >>> problems when combined with exponential back off.  I am not sure how
> >>> difficult or clean will it be to expose some sort of configuration to
> >>> allow
> >>> such optimization?  Do you think it will be worth trying out something?
> >>>
> >>> Thanks
> >>> Chandan
> >>>
> >>
> >>
> >
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by chandan singh <ck...@gmail.com>.
Hi Hugo

Hope I do not come across as arguing for its own sake.

1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
threads. We can use a single thread which is different from spout thread.
That thread and spout will share a queue of ConsumerRecords.

1.2 Agreed again. I had taken the spout recommendation about non-blocking
calls seriously, taking into account its critical role. I still believe it
is a sound recommendation but may be there are other factors like topology
throughput which might be more limiting than the spout making a Kafka
consumer poll.

2.1 There is no room for an optimization which effects normal (non-failure)
case negatively. I was just suggesting if we can optimize the failure case
without a trade-off. Failure processing is quite independent of
successfully acked messages. I am especially thinking of the failure case
which, for some reason, is recurrent;  the same message which does not get
acked. I agree that it is much better to address the issue in the
application.

2.2 As you have correctly mentioned, subsequently polled messages
(previously acked/emitted) are no longer emitted. We cannot commit unless
that failed message is finally acked after few retries. Due to which, we
don't even process any new set of records.  We anyway keep the current set
of messages (polled repeatedly) in memory.  All we are doing is polling the
same set of messages and iterating till that one failure is rescheduled for
N times (worse with exponential back off).

Anyway, we are dealing with rare scenarios which should get the least
priority especially if it introduces complexity.

On a different note, sending arbitrarily (large) sized messages to Kafka is
a problem in itself.

@Stig I will think about it further and evaluate the need for optimization.

Thanks
Chandan

On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <ck...@gmail.com> wrote:

> 1) Agreed that short duration blocks are less likely to cause an issue but
> it will be much better to avoid them. Anyway, a conclusion is not easy
> without some benchmark. I will let You know if I am able to do some volume
> testing on both options and observe significant difference.
>
> The auto spout block/wait when no messages are emitted makes absolute
> sense but a different scenario, less likely in a mostly loaded topology.
>
> You are absolutely correct about decoupling the consumption from
> processing when you see it from the perspective of Storm. I think there is
> a very subtle difference when we keep Kafka in mind. It is the spout thread
> which is polling (consuming) and iterating (processing) over the polled
> messages. If we separate the consumption in another thread and push
> messages in a queue, iterating (processing) is now concurrent and
> decoupled. Anyway, I too feel there should not be any significant
> difference in throughput but it will be interesting to measure the
> difference.
>
> I went overboard while mentioning seek before every poll when I had the
> failure scenario in mind.
>
> Thanks a lot. I will keep you guys updated on the switch from custom spout
> to KafkaSpout.
>
> Thanks for the amazing work.
>
> Chandan
>
> On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <ck...@gmail.com>
> wrote:
>
>> Sorry about being cryptic there. What I meant is that it will be much
>> better if we don't make assumptions about frequency of failure rates in
>> topologies. I know it is more of a commonsense but out of curiosity, can
>> you point me to any Storm documentation which makes a comment on preferable
>> failure rates. I was suggesting if we can offer the user an optimization
>> through clean API, the user will be free to decide on the rationale of
>> using it.
>>
>> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
>> evans@yahoo-inc.com.invalid> wrote:
>>
>>> I'm not sure what assumptions you want to make that this is preventing,
>>> or why they would be helpful.
>>>
>>> - Bobby
>>>
>>>
>>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
>>> cks071g2@gmail.com> wrote:
>>>
>>> Hi Stig & Bobby
>>>
>>> Thanks for confirming my understanding.
>>>
>>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
>>> has been a guideline on http://storm.apache.org/releas
>>> es/1.1.0/Concepts.html
>>> for long. Copying verbatim here : "The main method on spouts is
>>> nextTuple.
>>> nextTuple either emits a new tuple into the topology or simply returns if
>>> there are no new tuples to emit. It is imperative that nextTuple does not
>>> block for any spout implementation, because Storm calls all the spout
>>> methods on the same thread." I admit that there is some chance my
>>> interpretation is partially incorrect but I have been following it in a
>>> custom spout till now. Even though the objective is different, there is a
>>> similar hint on Kafka official documentation. Please see under heading
>>> "2.
>>> Decouple Consumption and Processing" on
>>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
>>> kafka/clients/consumer/KafkaConsumer.html.
>>> Essentially, a thread polls Kafka and spout thread gets the messages
>>> through a shared queue. If pre-fetching is present in Kafka (I will read
>>> about it further), I assume we do not have to fetch in another thread
>>> but I
>>> am not sure how does the pre-fetching behave with re-seeking before every
>>> poll.
>>>
>>> 2) @Bobby, you are correct in pointing what needs to be optimized but the
>>> facts, sometimes, prevent us from making assumptions. We do optimize our
>>> retry loop such that we don't poll the messages again. I especially see
>>> problems when combined with exponential back off.  I am not sure how
>>> difficult or clean will it be to expose some sort of configuration to
>>> allow
>>> such optimization?  Do you think it will be worth trying out something?
>>>
>>> Thanks
>>> Chandan
>>>
>>
>>
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by chandan singh <ck...@gmail.com>.
1) Agreed that short duration blocks are less likely to cause an issue but
it will be much better to avoid them. Anyway, a conclusion is not easy
without some benchmark. I will let You know if I am able to do some volume
testing on both options and observe significant difference.

The auto spout block/wait when no messages are emitted makes absolute sense
but a different scenario, less likely in a mostly loaded topology.

You are absolutely correct about decoupling the consumption from processing
when you see it from the perspective of Storm. I think there is a very
subtle difference when we keep Kafka in mind. It is the spout thread which
is polling (consuming) and iterating (processing) over the polled messages.
If we separate the consumption in another thread and push messages in a
queue, iterating (processing) is now concurrent and decoupled. Anyway, I
too feel there should not be any significant difference in throughput but
it will be interesting to measure the difference.

I went overboard while mentioning seek before every poll when I had the
failure scenario in mind.

Thanks a lot. I will keep you guys updated on the switch from custom spout
to KafkaSpout.

Thanks for the amazing work.

Chandan

On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <ck...@gmail.com> wrote:

> Sorry about being cryptic there. What I meant is that it will be much
> better if we don't make assumptions about frequency of failure rates in
> topologies. I know it is more of a commonsense but out of curiosity, can
> you point me to any Storm documentation which makes a comment on preferable
> failure rates. I was suggesting if we can offer the user an optimization
> through clean API, the user will be free to decide on the rationale of
> using it.
>
> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <evans@yahoo-inc.com.invalid
> > wrote:
>
>> I'm not sure what assumptions you want to make that this is preventing,
>> or why they would be helpful.
>>
>> - Bobby
>>
>>
>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
>> cks071g2@gmail.com> wrote:
>>
>> Hi Stig & Bobby
>>
>> Thanks for confirming my understanding.
>>
>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
>> has been a guideline on http://storm.apache.org/releas
>> es/1.1.0/Concepts.html
>> for long. Copying verbatim here : "The main method on spouts is nextTuple.
>> nextTuple either emits a new tuple into the topology or simply returns if
>> there are no new tuples to emit. It is imperative that nextTuple does not
>> block for any spout implementation, because Storm calls all the spout
>> methods on the same thread." I admit that there is some chance my
>> interpretation is partially incorrect but I have been following it in a
>> custom spout till now. Even though the objective is different, there is a
>> similar hint on Kafka official documentation. Please see under heading "2.
>> Decouple Consumption and Processing" on
>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
>> kafka/clients/consumer/KafkaConsumer.html.
>> Essentially, a thread polls Kafka and spout thread gets the messages
>> through a shared queue. If pre-fetching is present in Kafka (I will read
>> about it further), I assume we do not have to fetch in another thread but
>> I
>> am not sure how does the pre-fetching behave with re-seeking before every
>> poll.
>>
>> 2) @Bobby, you are correct in pointing what needs to be optimized but the
>> facts, sometimes, prevent us from making assumptions. We do optimize our
>> retry loop such that we don't poll the messages again. I especially see
>> problems when combined with exponential back off.  I am not sure how
>> difficult or clean will it be to expose some sort of configuration to
>> allow
>> such optimization?  Do you think it will be worth trying out something?
>>
>> Thanks
>> Chandan
>>
>
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by chandan singh <ck...@gmail.com>.
Sorry about being cryptic there. What I meant is that it will be much
better if we don't make assumptions about frequency of failure rates in
topologies. I know it is more of a commonsense but out of curiosity, can
you point me to any Storm documentation which makes a comment on preferable
failure rates. I was suggesting if we can offer the user an optimization
through clean API, the user will be free to decide on the rationale of
using it.

On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

> I'm not sure what assumptions you want to make that this is preventing, or
> why they would be helpful.
>
> - Bobby
>
>
> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> cks071g2@gmail.com> wrote:
>
> Hi Stig & Bobby
>
> Thanks for confirming my understanding.
>
> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
> has been a guideline on http://storm.apache.org/
> releases/1.1.0/Concepts.html
> for long. Copying verbatim here : "The main method on spouts is nextTuple.
> nextTuple either emits a new tuple into the topology or simply returns if
> there are no new tuples to emit. It is imperative that nextTuple does not
> block for any spout implementation, because Storm calls all the spout
> methods on the same thread." I admit that there is some chance my
> interpretation is partially incorrect but I have been following it in a
> custom spout till now. Even though the objective is different, there is a
> similar hint on Kafka official documentation. Please see under heading "2.
> Decouple Consumption and Processing" on
> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html.
> Essentially, a thread polls Kafka and spout thread gets the messages
> through a shared queue. If pre-fetching is present in Kafka (I will read
> about it further), I assume we do not have to fetch in another thread but I
> am not sure how does the pre-fetching behave with re-seeking before every
> poll.
>
> 2) @Bobby, you are correct in pointing what needs to be optimized but the
> facts, sometimes, prevent us from making assumptions. We do optimize our
> retry loop such that we don't poll the messages again. I especially see
> problems when combined with exponential back off.  I am not sure how
> difficult or clean will it be to expose some sort of configuration to allow
> such optimization?  Do you think it will be worth trying out something?
>
> Thanks
> Chandan
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
I'm not sure what assumptions you want to make that this is preventing, or why they would be helpful.

- Bobby


On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <ck...@gmail.com> wrote:

Hi Stig & Bobby

Thanks for confirming my understanding.

1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
has been a guideline on http://storm.apache.org/releases/1.1.0/Concepts.html
for long. Copying verbatim here : "The main method on spouts is nextTuple.
nextTuple either emits a new tuple into the topology or simply returns if
there are no new tuples to emit. It is imperative that nextTuple does not
block for any spout implementation, because Storm calls all the spout
methods on the same thread." I admit that there is some chance my
interpretation is partially incorrect but I have been following it in a
custom spout till now. Even though the objective is different, there is a
similar hint on Kafka official documentation. Please see under heading "2.
Decouple Consumption and Processing" on
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.
Essentially, a thread polls Kafka and spout thread gets the messages
through a shared queue. If pre-fetching is present in Kafka (I will read
about it further), I assume we do not have to fetch in another thread but I
am not sure how does the pre-fetching behave with re-seeking before every
poll.

2) @Bobby, you are correct in pointing what needs to be optimized but the
facts, sometimes, prevent us from making assumptions. We do optimize our
retry loop such that we don't poll the messages again. I especially see
problems when combined with exponential back off.  I am not sure how
difficult or clean will it be to expose some sort of configuration to allow
such optimization?  Do you think it will be worth trying out something?

Thanks
Chandan

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by chandan singh <ck...@gmail.com>.
Hi Stig & Bobby

Thanks for confirming my understanding.

1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
has been a guideline on http://storm.apache.org/releases/1.1.0/Concepts.html
for long. Copying verbatim here : "The main method on spouts is nextTuple.
nextTuple either emits a new tuple into the topology or simply returns if
there are no new tuples to emit. It is imperative that nextTuple does not
block for any spout implementation, because Storm calls all the spout
methods on the same thread." I admit that there is some chance my
interpretation is partially incorrect but I have been following it in a
custom spout till now. Even though the objective is different, there is a
similar hint on Kafka official documentation. Please see under heading "2.
Decouple Consumption and Processing" on
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.
Essentially, a thread polls Kafka and spout thread gets the messages
through a shared queue. If pre-fetching is present in Kafka (I will read
about it further), I assume we do not have to fetch in another thread but I
am not sure how does the pre-fetching behave with re-seeking before every
poll.

2) @Bobby, you are correct in pointing what needs to be optimized but the
facts, sometimes, prevent us from making assumptions. We do optimize our
retry loop such that we don't poll the messages again. I especially see
problems when combined with exponential back off.  I am not sure how
difficult or clean will it be to expose some sort of configuration to allow
such optimization?  Do you think it will be worth trying out something?

Thanks
Chandan

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
For #2 it is a question of what do you optimize for.  Storm typically assumes that failures should be rare so we optimize for that.  We keep the minimal information around to be able to replay the message, but not a lot more.  If you are getting lots of failures, you really should be more concerned about why you are getting failures and not so much with how the failures are impacting the performance of the spout.  Or to put it another way the best performance optimization is to stop doing the thing that is slow.  If failures are the slowest part the best thing for performance is to stop failing.


- Bobby


On Monday, July 10, 2017, 9:10:38 AM CDT, Stig Rohde Døssing <st...@gmail.com> wrote:

Hi Chandan,

I'm going to assume we're talking about the storm-kafka-client spout, and
not the storm-kafka spout.

1) Yes, we do polling and committing in the spout thread. I'm not aware of
why that would be against spout best practices? Simplicity is definitely a
reason, but I don't think anyone has actually checked what the throughput
of a threaded implementation would be compared to the current
implementation. Keep in mind that the KafkaConsumer is not thread safe, so
a threaded implementation would still need to do all consumer interaction
via a single thread. Also as far as I know the KafkaConsumer does
prefetching of messages (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-Prefetching),
so most calls to poll should not take very long.

What kind of threading did you have in mind?

2) No, that seems correct. When a message fails the spout remembers the
offset of the failed message, but does not keep the message in memory. When
the message becomes ready for retry, the spout seeks the consumer back to
the failed message's offset and fetches it again. The consumer will fetch
that message plus any later messages before it catches back up to where it
was before the message failed.

We choose to poll for the failed message again because keeping it around in
memory could be a problem if there are many failed messages, since they'd
potentially need to hang around for a while depending on the configured
backoff.

There's maybe a potential optimization here where we could try to seek
directly to the latest emitted offset instead of fetching everything from
the failed message forward, but it's not something we've looked at
implementing. I haven't looked at whether this would work, or what kind of
edge cases there may be.

Do you have suggestions for improving/replacing this loop?

2017-07-10 15:39 GMT+02:00 chandan singh <ck...@gmail.com>:

> Hi
>
> I hope I am using the right mailing list. Please advice if I am wrong.
>
> I have few observations about the KafkaSpout and feel that some of these
> lead to inefficiencies. It will be of great help if someone can throw some
> light on the rationale behind the implementation.
>
> 1) Kafka polling and committing offsets is done in the spout thread which
> is somewhat against the spout best practices. Is simplicity the reason
> behind this design? Am I missing something?
>
> 2)  Poll-iterate-commit-seek loop seems inefficient in recurrent failure
> scenarios. Let say the first massage fails. We will keep polling the same
> set of messages at least as many times as that message is retried and
> probably more if we are using exponential back-off. Did I misunderstand the
> implementation?
>
> Regards
> Chandan
>

Re: Few observations related to KafkaSpout implementation (1.1.0)

Posted by Stig Rohde Døssing <st...@gmail.com>.
Hi Chandan,

I'm going to assume we're talking about the storm-kafka-client spout, and
not the storm-kafka spout.

1) Yes, we do polling and committing in the spout thread. I'm not aware of
why that would be against spout best practices? Simplicity is definitely a
reason, but I don't think anyone has actually checked what the throughput
of a threaded implementation would be compared to the current
implementation. Keep in mind that the KafkaConsumer is not thread safe, so
a threaded implementation would still need to do all consumer interaction
via a single thread. Also as far as I know the KafkaConsumer does
prefetching of messages (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP-41:KafkaConsumerMaxRecords-Prefetching),
so most calls to poll should not take very long.

What kind of threading did you have in mind?

2) No, that seems correct. When a message fails the spout remembers the
offset of the failed message, but does not keep the message in memory. When
the message becomes ready for retry, the spout seeks the consumer back to
the failed message's offset and fetches it again. The consumer will fetch
that message plus any later messages before it catches back up to where it
was before the message failed.

We choose to poll for the failed message again because keeping it around in
memory could be a problem if there are many failed messages, since they'd
potentially need to hang around for a while depending on the configured
backoff.

There's maybe a potential optimization here where we could try to seek
directly to the latest emitted offset instead of fetching everything from
the failed message forward, but it's not something we've looked at
implementing. I haven't looked at whether this would work, or what kind of
edge cases there may be.

Do you have suggestions for improving/replacing this loop?

2017-07-10 15:39 GMT+02:00 chandan singh <ck...@gmail.com>:

> Hi
>
> I hope I am using the right mailing list. Please advice if I am wrong.
>
> I have few observations about the KafkaSpout and feel that some of these
> lead to inefficiencies. It will be of great help if someone can throw some
> light on the rationale behind the implementation.
>
> 1) Kafka polling and committing offsets is done in the spout thread which
> is somewhat against the spout best practices. Is simplicity the reason
> behind this design? Am I missing something?
>
> 2)  Poll-iterate-commit-seek loop seems inefficient in recurrent failure
> scenarios. Let say the first massage fails. We will keep polling the same
> set of messages at least as many times as that message is retried and
> probably more if we are using exponential back-off. Did I misunderstand the
> implementation?
>
> Regards
> Chandan
>