You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/03/05 07:34:31 UTC

Fixing two critical bugs in kafka streams

Hi,
So far in our experiment we have encountered 2 critical bugs.
1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
cycle it gets evicted from group and rebalance takes place and it gets new
assignment.
However when this thread tries to commit offsets for the revoked partitions
in
onPartitionsRevoked it will again throw the CommitFailedException.

This gets handled by ConsumerCoordinatorso there is no point to assign this
exception to
rebalanceException in StreamThread and stop it. It has already been
assigned new partitions and it can continue.

So as fix in case on CommitFailedException I am not killing the StreamThrea.

2. Next we see a deadlock state when to process a task it takes longer
than MAX_POLL_INTERVAL_MS_CONFIG
time. Then this threads partitions are assigned to some other thread
including rocksdb lock. When it tries to process the next task it cannot
get rocks db lock and simply keeps waiting for that lock forever.

in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
If it does not get lock the we simply increase the time by 10x and keep
trying inside the while true loop.

We need to have a upper bound for this backoffTimeM. If the time is greater
than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means
this thread's partitions are moved somewhere else and it may not get the
lock again.

So I have added an upper bound check in that while loop.

The commits are here:
https://github.com/sjmittal/kafka/commit/6f04327c890c58cab9b1ae108af4ce5c4e3b89a1

please review and if you feel they make sense, please merge it to main
branch.

Thanks
Sachin

Re: Fixing two critical bugs in kafka streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for explaining in more detail. Now I understood it. Was on a
complete wrong track before that!

Great find and thanks for fixing it :)

With regard to two unrelated issues. We should have two JIRAs and two PRs.


-Matthias

On 3/6/17 8:48 PM, Sachin Mittal wrote:
>> As for the second issue you brought up, I agree it is indeed a bug; but
> just to clarify it is the CREATION of the first task including restoring
> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> processing it right
> 
> Yes this is correct. I may have misused the terminology so lets not confuse
> with processing in terms of kafka streams.
> 
>> From my understanding, the deadlock is "caused" by your fix of problem
> one. If the thread would die, the lock would get release and no deadlock
> would occur.
> 
> Well actually deadlock issue is different from CommitFailedException and is
> not caused by it. However if we fix this, it may potentially cause
> CommitFailedException later on, but in trunk we have already fixed that
> CommitFailedExceptio, so only issue left is this deadlock issue.
> I actually did a single commit for both the issue, so it may have got
> confusing that fix of one causes second, but they are essentially unrelated.
> 
> Deadlock issue is simply if CREATION of the first task including restoring
> stores takes longer than MAX_POLL_INTERVAL_MS_CONFIG then the second task
> in that pipeline may go into deadlock state if some other thread has
> already got the handle of that partition. So as per me we may need some
> upper bound check for backoffTimeMs .
> 
> Thanks
> Sachin
> 
> 
> 
> On Tue, Mar 7, 2017 at 3:24 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for your input. I now understood the first issue (and the fix).
>> Still not sure about the second issue.
>>
>> From my understanding, the deadlock is "caused" by your fix of problem
>> one. If the thread would die, the lock would get release and no deadlock
>> would occur.
>>
>> However, because the thread does not die, it also does not release its
>> locks. Thus, from my understanding, in case of a CommitFailedException,
>> the thread must release its locks, too. This would resolve the deadlock
>> -- we don't need any timeout for this.
>>
>> Or do I still miss understand the issue?
>>
>>
>> -Matthias
>>
>>
>> On 3/6/17 11:41 AM, Guozhang Wang wrote:
>>> Hello Sachin,
>>>
>>> Thanks for your finds!! Just to add what Damian said regarding 1), in
>>> KIP-129 where we are introducing exactly-once processing semantics to
>>> Streams we have also described different categories of error handling for
>>> exactly-once. Commit exceptions due to rebalance will be handled as
>>> "producer fenced" which will not be thrown to the users as we already did
>>> in trunk.
>>>
>>> As for the second issue you brought up, I agree it is indeed a bug; but
>>> just to clarify it is the CREATION of the first task including restoring
>>> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
>>> processing it right (we are only start processing until all tasks have
>> been
>>> created and initialized)?
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sj...@gmail.com>
>> wrote:
>>>
>>>> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>>>>
>>>>
>>>> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <da...@gmail.com>
>> wrote:
>>>>
>>>>> Hi Sachin,
>>>>>
>>>>> If it is a bug then please file a JIRA for it, too.
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:
>>>>>
>>>>>> Ok that's great.
>>>>>> So you have already fixed that issue.
>>>>>>
>>>>>> I have modified my PR to remove that change (which was done keeping
>>>>>> 0.10.2.0 in mind).
>>>>>>
>>>>>> However the other issue is still valid.
>>>>>>
>>>>>> Please review that change. https://github.com/apache/kafka/pull/2642
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> On trunk the CommitFailedException isn't thrown anymore. The
>>>>>> commitOffsets
>>>>>>> method doesn't throw an exception. It returns one if it was thrown.
>>>> We
>>>>>> used
>>>>>>> to throw this exception during suspendTasksAndState, but we don't
>>>>>> anymore.
>>>>>>>
>>>>>>> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>> On CommitFailedException at onPartitionsRevoked if it is thrown it
>>>>> gets
>>>>>>>> assigned to rebalanceException.
>>>>>>>> This causes the stream thread to shutdown. I am not sure how we can
>>>>>>> resume
>>>>>>>> the thread.
>>>>>>>>
>>>>>>>> Note thread is not in invalid state because because it already has
>>>>> been
>>>>>>>> assigned new partitions and this exception happens when trying to
>>>>>> revoke
>>>>>>>> old partitions which have been moved to some other thread, so we
>>>> need
>>>>>> to
>>>>>>>> swallow this exception at the StreanThread side too, just like we
>>>>>> swallow
>>>>>>>> it at ConsumerCoordinator.java
>>>>>>>>
>>>>>>>> Also I fixed this against code base 0.10.2.0 and the difference in
>>>>> that
>>>>>>> vs
>>>>>>>> trunk code is these lines
>>>>>>>> 10.2.0
>>>>>>>>        if (firstException.get() == null) {
>>>>>>>>             firstException.set(commitOffsets());
>>>>>>>>        }
>>>>>>>>  vs trunk
>>>>>>>>         if (firstException.get() == null) {
>>>>>>>>             // TODO: currently commit failures will not be thrown
>>>> to
>>>>>>> users
>>>>>>>>             // while suspending tasks; this need to be re-visit
>>>> after
>>>>>>>> KIP-98
>>>>>>>>             commitOffsets();
>>>>>>>>         }
>>>>>>>> I am again not sure since this part is still a TODO, but looking at
>>>>>> code
>>>>>>> I
>>>>>>>> see that commitOffsets can still throw the CommitFailedException
>>>>> which
>>>>>>>> needs to be handled at onPartitionsRevoked.
>>>>>>>>
>>>>>>>> Hope this makes sense.
>>>>>>>>
>>>>>>>> On second issue, the deadlock is not caused by
>>>> CommitFailedExceptio,
>>>>>> but
>>>>>>>> after fixing the deadlock we need to make sure thread does not die
>>>>> due
>>>>>> to
>>>>>>>> unhandled CommitFailedException at onPartitionsRevoked.
>>>>>>>> The deadlock issue is like this.
>>>>>>>> If a thread has two partitions and while processing partition one
>>>> it
>>>>>>> takes
>>>>>>>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
>>>>> evicted
>>>>>>>> from the group and both partitions are now migrated to some other
>>>>>> thread.
>>>>>>>> Now when it tries to process the partition two it tries to get the
>>>>> lock
>>>>>>> to
>>>>>>>> rocks db. It won't get the lock since that partition is now moved
>>>> to
>>>>>> some
>>>>>>>> other thread. So it keeps increasing the backoffTimeMs and keeps
>>>>> trying
>>>>>>> to
>>>>>>>> get the lock forever. This reaching a deadlock.
>>>>>>>> To fix this we need some upper bound of the time limit till it
>>>> tries
>>>>> to
>>>>>>> get
>>>>>>>> that lock. And that upper bound has to be
>>>>> MAX_POLL_INTERVAL_MS_CONFIG,
>>>>>>>> because if by that time it has not got the lock, we can see that
>>>> this
>>>>>>>> thread was evicted from the group and need to rejoin again to get
>>>> new
>>>>>>>> partitions.
>>>>>>>>
>>>>>>>> On JIRA issue I can create one and attach the part of logs where it
>>>>>> keeps
>>>>>>>> trying to get the lock with increasing backoffTimeM.
>>>>>>>>
>>>>>>>> Let me know if these makes sense. Right now this is the best way we
>>>>>> could
>>>>>>>> come up with to handle stream thread failures.
>>>>>>>>
>>>>>>>> Also on a side note I feel we need more resilient streams. If we
>>>> have
>>>>>> say
>>>>>>>> configured our streams application with 4 threads and for whatever
>>>>>>> reason a
>>>>>>>> thread dies, then application should itself (or via some exposed
>>>>>> hooks),
>>>>>>>> allow to restart a new thread (because in Java I guess same thread
>>>>>> cannot
>>>>>>>> be restarted), so that number of threads always stay what one has
>>>>>>>> configured.
>>>>>>>> I think exposed hooks will be better option to do this.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Sachin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
>>>>> matthias@confluent.io
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sachin,
>>>>>>>>>
>>>>>>>>> thanks a lot for contributing!
>>>>>>>>>
>>>>>>>>> Right now, I am not sure if I understand the change. On
>>>>>>>>> CommitFailedException, why can we just resume the thread? To me,
>>>> it
>>>>>>>>> seems that the thread will be in an invalid state and thus it's
>>>> not
>>>>>>> save
>>>>>>>>> to just swallow the exception and keep going. Can you shed some
>>>>>> light?
>>>>>>>>>
>>>>>>>>> And from my understanding, the deadlock is "caused" by the change
>>>>>> from
>>>>>>>>> above, right? So if it is save to swallow the exception, we
>>>> should
>>>>> do
>>>>>>>>> some "clean up" to avoid the deadlock in the first place, instead
>>>>> of
>>>>>>>>> applying and additional timeout.
>>>>>>>>>
>>>>>>>>> Also, if this is a bug, we should have a JIRA.
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3/5/17 4:11 AM, Sachin Mittal wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> Please find the new PR
>>>>>>>>>> https://github.com/apache/kafka/pull/2642/
>>>>>>>>>>
>>>>>>>>>> I see that in trunk there has been change which is different
>>>> from
>>>>>> in
>>>>>>>>> 10.2.0
>>>>>>>>>>
>>>>>>>>>> 10.2.0
>>>>>>>>>>        if (firstException.get() == null) {
>>>>>>>>>>             firstException.set(commitOffsets());
>>>>>>>>>>        }
>>>>>>>>>>  vs trunk
>>>>>>>>>>         if (firstException.get() == null) {
>>>>>>>>>>             // TODO: currently commit failures will not be
>>>> thrown
>>>>>> to
>>>>>>>>> users
>>>>>>>>>>             // while suspending tasks; this need to be re-visit
>>>>>> after
>>>>>>>>> KIP-98
>>>>>>>>>>             commitOffsets();
>>>>>>>>>>         }
>>>>>>>>>> I am not sure in view of this is is my part of the fix still
>>>>> valid.
>>>>>>>> Looks
>>>>>>>>>> like it is still valid.
>>>>>>>>>>
>>>>>>>>>> Also on side note what is the policy of closing a branch that
>>>> is
>>>>>> just
>>>>>>>>>> released.
>>>>>>>>>>
>>>>>>>>>> Since you have release 10.2.0 we are using that and that is why
>>>>>> have
>>>>>>>> made
>>>>>>>>>> changes in that branch so that our changes just modify the
>>>> needed
>>>>>>> code
>>>>>>>>> and
>>>>>>>>>> we don't mess up the other released code.
>>>>>>>>>>
>>>>>>>>>> Is the new release released off the branch 10.2.0, if yes then
>>>>> you
>>>>>>>> should
>>>>>>>>>> not close it as there can be patch fixes on them.
>>>>>>>>>>
>>>>>>>>>> Or is the release always made off the branch trunk. In that
>>>> case
>>>>>> how
>>>>>>>> can
>>>>>>>>> we
>>>>>>>>>> pick up the code on which the release binaries were created so
>>>>> when
>>>>>>> we
>>>>>>>>>> build the binary we have exactly same code as released one,
>>>> plus
>>>>>> any
>>>>>>>>>> changes (we or someone else) makes on it.
>>>>>>>>>>
>>>>>>>>>> Also if a branch is closed, then perhaps we should delete it or
>>>>>> mark
>>>>>>> it
>>>>>>>>>> closed or something.
>>>>>>>>>>
>>>>>>>>>> Please let us know how releases get created (off what
>>>> codebase),
>>>>> so
>>>>>>> we
>>>>>>>>> are
>>>>>>>>>> more exact in applying our changes to.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Sachin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
>>>>>> eno.thereska@gmail.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Sachin, one thing before the review, 0.10.2 is closed
>>>>> now,
>>>>>>> this
>>>>>>>>>>> needs to target trunk.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>>>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Please review the PR and let me know if this makes sense.
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/kafka/pull/2640
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Sachin
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
>>>>>>> eno.thereska@gmail.com
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Sachin for your contribution. Could you create a pull
>>>>>>> request
>>>>>>>>> out
>>>>>>>>>>>>> of the commit (so we can add comments, and also so you are
>>>>>>>>> acknowledged
>>>>>>>>>>>>> properly for your contribution)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> So far in our experiment we have encountered 2 critical
>>>> bugs.
>>>>>>>>>>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
>>>> to
>>>>>>>>> compute a
>>>>>>>>>>>>>> cycle it gets evicted from group and rebalance takes place
>>>>> and
>>>>>> it
>>>>>>>>> gets
>>>>>>>>>>>>> new
>>>>>>>>>>>>>> assignment.
>>>>>>>>>>>>>> However when this thread tries to commit offsets for the
>>>>>> revoked
>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> onPartitionsRevoked it will again throw the
>>>>>>> CommitFailedException.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This gets handled by ConsumerCoordinatorso there is no
>>>> point
>>>>> to
>>>>>>>>> assign
>>>>>>>>>>>>> this
>>>>>>>>>>>>>> exception to
>>>>>>>>>>>>>> rebalanceException in StreamThread and stop it. It has
>>>>> already
>>>>>>> been
>>>>>>>>>>>>>> assigned new partitions and it can continue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So as fix in case on CommitFailedException I am not killing
>>>>> the
>>>>>>>>>>>>> StreamThrea.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Next we see a deadlock state when to process a task it
>>>>> takes
>>>>>>>>> longer
>>>>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG
>>>>>>>>>>>>>> time. Then this threads partitions are assigned to some
>>>> other
>>>>>>>> thread
>>>>>>>>>>>>>> including rocksdb lock. When it tries to process the next
>>>>> task
>>>>>> it
>>>>>>>>>>> cannot
>>>>>>>>>>>>>> get rocks db lock and simply keeps waiting for that lock
>>>>>> forever.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> in retryWithBackoff for AbstractTaskCreator we have a
>>>>>>>> backoffTimeMs =
>>>>>>>>>>>>> 50L.
>>>>>>>>>>>>>> If it does not get lock the we simply increase the time by
>>>>> 10x
>>>>>>> and
>>>>>>>>> keep
>>>>>>>>>>>>>> trying inside the while true loop.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We need to have a upper bound for this backoffTimeM. If the
>>>>>> time
>>>>>>> is
>>>>>>>>>>>>> greater
>>>>>>>>>>>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got
>>>> the
>>>>>>> lock
>>>>>>>>>>> means
>>>>>>>>>>>>>> this thread's partitions are moved somewhere else and it
>>>> may
>>>>>> not
>>>>>>>> get
>>>>>>>>>>> the
>>>>>>>>>>>>>> lock again.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So I have added an upper bound check in that while loop.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The commits are here:
>>>>>>>>>>>>>> https://github.com/sjmittal/kafka/commit/
>>>>>>>>>>> 6f04327c890c58cab9b1ae108af4ce
>>>>>>>>>>>>> 5c4e3b89a1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> please review and if you feel they make sense, please merge
>>>>> it
>>>>>> to
>>>>>>>>> main
>>>>>>>>>>>>>> branch.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Sachin
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>
>>
> 


Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
> As for the second issue you brought up, I agree it is indeed a bug; but
just to clarify it is the CREATION of the first task including restoring
stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
processing it right

Yes this is correct. I may have misused the terminology so lets not confuse
with processing in terms of kafka streams.

> From my understanding, the deadlock is "caused" by your fix of problem
one. If the thread would die, the lock would get release and no deadlock
would occur.

Well actually deadlock issue is different from CommitFailedException and is
not caused by it. However if we fix this, it may potentially cause
CommitFailedException later on, but in trunk we have already fixed that
CommitFailedExceptio, so only issue left is this deadlock issue.
I actually did a single commit for both the issue, so it may have got
confusing that fix of one causes second, but they are essentially unrelated.

Deadlock issue is simply if CREATION of the first task including restoring
stores takes longer than MAX_POLL_INTERVAL_MS_CONFIG then the second task
in that pipeline may go into deadlock state if some other thread has
already got the handle of that partition. So as per me we may need some
upper bound check for backoffTimeMs .

Thanks
Sachin



On Tue, Mar 7, 2017 at 3:24 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for your input. I now understood the first issue (and the fix).
> Still not sure about the second issue.
>
> From my understanding, the deadlock is "caused" by your fix of problem
> one. If the thread would die, the lock would get release and no deadlock
> would occur.
>
> However, because the thread does not die, it also does not release its
> locks. Thus, from my understanding, in case of a CommitFailedException,
> the thread must release its locks, too. This would resolve the deadlock
> -- we don't need any timeout for this.
>
> Or do I still miss understand the issue?
>
>
> -Matthias
>
>
> On 3/6/17 11:41 AM, Guozhang Wang wrote:
> > Hello Sachin,
> >
> > Thanks for your finds!! Just to add what Damian said regarding 1), in
> > KIP-129 where we are introducing exactly-once processing semantics to
> > Streams we have also described different categories of error handling for
> > exactly-once. Commit exceptions due to rebalance will be handled as
> > "producer fenced" which will not be thrown to the users as we already did
> > in trunk.
> >
> > As for the second issue you brought up, I agree it is indeed a bug; but
> > just to clarify it is the CREATION of the first task including restoring
> > stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> > processing it right (we are only start processing until all tasks have
> been
> > created and initialized)?
> >
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sj...@gmail.com>
> wrote:
> >
> >> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
> >>
> >>
> >> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <da...@gmail.com>
> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> If it is a bug then please file a JIRA for it, too.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>>> Ok that's great.
> >>>> So you have already fixed that issue.
> >>>>
> >>>> I have modified my PR to remove that change (which was done keeping
> >>>> 0.10.2.0 in mind).
> >>>>
> >>>> However the other issue is still valid.
> >>>>
> >>>> Please review that change. https://github.com/apache/kafka/pull/2642
> >>>>
> >>>>
> >>>> Thanks
> >>>> Sachin
> >>>>
> >>>>
> >>>> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com>
> >> wrote:
> >>>>
> >>>>> On trunk the CommitFailedException isn't thrown anymore. The
> >>>> commitOffsets
> >>>>> method doesn't throw an exception. It returns one if it was thrown.
> >> We
> >>>> used
> >>>>> to throw this exception during suspendTasksAndState, but we don't
> >>>> anymore.
> >>>>>
> >>>>> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi
> >>>>>> On CommitFailedException at onPartitionsRevoked if it is thrown it
> >>> gets
> >>>>>> assigned to rebalanceException.
> >>>>>> This causes the stream thread to shutdown. I am not sure how we can
> >>>>> resume
> >>>>>> the thread.
> >>>>>>
> >>>>>> Note thread is not in invalid state because because it already has
> >>> been
> >>>>>> assigned new partitions and this exception happens when trying to
> >>>> revoke
> >>>>>> old partitions which have been moved to some other thread, so we
> >> need
> >>>> to
> >>>>>> swallow this exception at the StreanThread side too, just like we
> >>>> swallow
> >>>>>> it at ConsumerCoordinator.java
> >>>>>>
> >>>>>> Also I fixed this against code base 0.10.2.0 and the difference in
> >>> that
> >>>>> vs
> >>>>>> trunk code is these lines
> >>>>>> 10.2.0
> >>>>>>        if (firstException.get() == null) {
> >>>>>>             firstException.set(commitOffsets());
> >>>>>>        }
> >>>>>>  vs trunk
> >>>>>>         if (firstException.get() == null) {
> >>>>>>             // TODO: currently commit failures will not be thrown
> >> to
> >>>>> users
> >>>>>>             // while suspending tasks; this need to be re-visit
> >> after
> >>>>>> KIP-98
> >>>>>>             commitOffsets();
> >>>>>>         }
> >>>>>> I am again not sure since this part is still a TODO, but looking at
> >>>> code
> >>>>> I
> >>>>>> see that commitOffsets can still throw the CommitFailedException
> >>> which
> >>>>>> needs to be handled at onPartitionsRevoked.
> >>>>>>
> >>>>>> Hope this makes sense.
> >>>>>>
> >>>>>> On second issue, the deadlock is not caused by
> >> CommitFailedExceptio,
> >>>> but
> >>>>>> after fixing the deadlock we need to make sure thread does not die
> >>> due
> >>>> to
> >>>>>> unhandled CommitFailedException at onPartitionsRevoked.
> >>>>>> The deadlock issue is like this.
> >>>>>> If a thread has two partitions and while processing partition one
> >> it
> >>>>> takes
> >>>>>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> >>> evicted
> >>>>>> from the group and both partitions are now migrated to some other
> >>>> thread.
> >>>>>> Now when it tries to process the partition two it tries to get the
> >>> lock
> >>>>> to
> >>>>>> rocks db. It won't get the lock since that partition is now moved
> >> to
> >>>> some
> >>>>>> other thread. So it keeps increasing the backoffTimeMs and keeps
> >>> trying
> >>>>> to
> >>>>>> get the lock forever. This reaching a deadlock.
> >>>>>> To fix this we need some upper bound of the time limit till it
> >> tries
> >>> to
> >>>>> get
> >>>>>> that lock. And that upper bound has to be
> >>> MAX_POLL_INTERVAL_MS_CONFIG,
> >>>>>> because if by that time it has not got the lock, we can see that
> >> this
> >>>>>> thread was evicted from the group and need to rejoin again to get
> >> new
> >>>>>> partitions.
> >>>>>>
> >>>>>> On JIRA issue I can create one and attach the part of logs where it
> >>>> keeps
> >>>>>> trying to get the lock with increasing backoffTimeM.
> >>>>>>
> >>>>>> Let me know if these makes sense. Right now this is the best way we
> >>>> could
> >>>>>> come up with to handle stream thread failures.
> >>>>>>
> >>>>>> Also on a side note I feel we need more resilient streams. If we
> >> have
> >>>> say
> >>>>>> configured our streams application with 4 threads and for whatever
> >>>>> reason a
> >>>>>> thread dies, then application should itself (or via some exposed
> >>>> hooks),
> >>>>>> allow to restart a new thread (because in Java I guess same thread
> >>>> cannot
> >>>>>> be restarted), so that number of threads always stay what one has
> >>>>>> configured.
> >>>>>> I think exposed hooks will be better option to do this.
> >>>>>>
> >>>>>> Thanks
> >>>>>> Sachin
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
> >>> matthias@confluent.io
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Sachin,
> >>>>>>>
> >>>>>>> thanks a lot for contributing!
> >>>>>>>
> >>>>>>> Right now, I am not sure if I understand the change. On
> >>>>>>> CommitFailedException, why can we just resume the thread? To me,
> >> it
> >>>>>>> seems that the thread will be in an invalid state and thus it's
> >> not
> >>>>> save
> >>>>>>> to just swallow the exception and keep going. Can you shed some
> >>>> light?
> >>>>>>>
> >>>>>>> And from my understanding, the deadlock is "caused" by the change
> >>>> from
> >>>>>>> above, right? So if it is save to swallow the exception, we
> >> should
> >>> do
> >>>>>>> some "clean up" to avoid the deadlock in the first place, instead
> >>> of
> >>>>>>> applying and additional timeout.
> >>>>>>>
> >>>>>>> Also, if this is a bug, we should have a JIRA.
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>> On 3/5/17 4:11 AM, Sachin Mittal wrote:
> >>>>>>>> Hi,
> >>>>>>>> Please find the new PR
> >>>>>>>> https://github.com/apache/kafka/pull/2642/
> >>>>>>>>
> >>>>>>>> I see that in trunk there has been change which is different
> >> from
> >>>> in
> >>>>>>> 10.2.0
> >>>>>>>>
> >>>>>>>> 10.2.0
> >>>>>>>>        if (firstException.get() == null) {
> >>>>>>>>             firstException.set(commitOffsets());
> >>>>>>>>        }
> >>>>>>>>  vs trunk
> >>>>>>>>         if (firstException.get() == null) {
> >>>>>>>>             // TODO: currently commit failures will not be
> >> thrown
> >>>> to
> >>>>>>> users
> >>>>>>>>             // while suspending tasks; this need to be re-visit
> >>>> after
> >>>>>>> KIP-98
> >>>>>>>>             commitOffsets();
> >>>>>>>>         }
> >>>>>>>> I am not sure in view of this is is my part of the fix still
> >>> valid.
> >>>>>> Looks
> >>>>>>>> like it is still valid.
> >>>>>>>>
> >>>>>>>> Also on side note what is the policy of closing a branch that
> >> is
> >>>> just
> >>>>>>>> released.
> >>>>>>>>
> >>>>>>>> Since you have release 10.2.0 we are using that and that is why
> >>>> have
> >>>>>> made
> >>>>>>>> changes in that branch so that our changes just modify the
> >> needed
> >>>>> code
> >>>>>>> and
> >>>>>>>> we don't mess up the other released code.
> >>>>>>>>
> >>>>>>>> Is the new release released off the branch 10.2.0, if yes then
> >>> you
> >>>>>> should
> >>>>>>>> not close it as there can be patch fixes on them.
> >>>>>>>>
> >>>>>>>> Or is the release always made off the branch trunk. In that
> >> case
> >>>> how
> >>>>>> can
> >>>>>>> we
> >>>>>>>> pick up the code on which the release binaries were created so
> >>> when
> >>>>> we
> >>>>>>>> build the binary we have exactly same code as released one,
> >> plus
> >>>> any
> >>>>>>>> changes (we or someone else) makes on it.
> >>>>>>>>
> >>>>>>>> Also if a branch is closed, then perhaps we should delete it or
> >>>> mark
> >>>>> it
> >>>>>>>> closed or something.
> >>>>>>>>
> >>>>>>>> Please let us know how releases get created (off what
> >> codebase),
> >>> so
> >>>>> we
> >>>>>>> are
> >>>>>>>> more exact in applying our changes to.
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> Sachin
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> >>>> eno.thereska@gmail.com
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks Sachin, one thing before the review, 0.10.2 is closed
> >>> now,
> >>>>> this
> >>>>>>>>> needs to target trunk.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Eno
> >>>>>>>>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
> >>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Please review the PR and let me know if this makes sense.
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/kafka/pull/2640
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>> Sachin
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> >>>>> eno.thereska@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Thanks Sachin for your contribution. Could you create a pull
> >>>>> request
> >>>>>>> out
> >>>>>>>>>>> of the commit (so we can add comments, and also so you are
> >>>>>>> acknowledged
> >>>>>>>>>>> properly for your contribution)?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>>> Eno
> >>>>>>>>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> So far in our experiment we have encountered 2 critical
> >> bugs.
> >>>>>>>>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
> >> to
> >>>>>>> compute a
> >>>>>>>>>>>> cycle it gets evicted from group and rebalance takes place
> >>> and
> >>>> it
> >>>>>>> gets
> >>>>>>>>>>> new
> >>>>>>>>>>>> assignment.
> >>>>>>>>>>>> However when this thread tries to commit offsets for the
> >>>> revoked
> >>>>>>>>>>> partitions
> >>>>>>>>>>>> in
> >>>>>>>>>>>> onPartitionsRevoked it will again throw the
> >>>>> CommitFailedException.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This gets handled by ConsumerCoordinatorso there is no
> >> point
> >>> to
> >>>>>>> assign
> >>>>>>>>>>> this
> >>>>>>>>>>>> exception to
> >>>>>>>>>>>> rebalanceException in StreamThread and stop it. It has
> >>> already
> >>>>> been
> >>>>>>>>>>>> assigned new partitions and it can continue.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So as fix in case on CommitFailedException I am not killing
> >>> the
> >>>>>>>>>>> StreamThrea.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. Next we see a deadlock state when to process a task it
> >>> takes
> >>>>>>> longer
> >>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> >>>>>>>>>>>> time. Then this threads partitions are assigned to some
> >> other
> >>>>>> thread
> >>>>>>>>>>>> including rocksdb lock. When it tries to process the next
> >>> task
> >>>> it
> >>>>>>>>> cannot
> >>>>>>>>>>>> get rocks db lock and simply keeps waiting for that lock
> >>>> forever.
> >>>>>>>>>>>>
> >>>>>>>>>>>> in retryWithBackoff for AbstractTaskCreator we have a
> >>>>>> backoffTimeMs =
> >>>>>>>>>>> 50L.
> >>>>>>>>>>>> If it does not get lock the we simply increase the time by
> >>> 10x
> >>>>> and
> >>>>>>> keep
> >>>>>>>>>>>> trying inside the while true loop.
> >>>>>>>>>>>>
> >>>>>>>>>>>> We need to have a upper bound for this backoffTimeM. If the
> >>>> time
> >>>>> is
> >>>>>>>>>>> greater
> >>>>>>>>>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got
> >> the
> >>>>> lock
> >>>>>>>>> means
> >>>>>>>>>>>> this thread's partitions are moved somewhere else and it
> >> may
> >>>> not
> >>>>>> get
> >>>>>>>>> the
> >>>>>>>>>>>> lock again.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So I have added an upper bound check in that while loop.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The commits are here:
> >>>>>>>>>>>> https://github.com/sjmittal/kafka/commit/
> >>>>>>>>> 6f04327c890c58cab9b1ae108af4ce
> >>>>>>>>>>> 5c4e3b89a1
> >>>>>>>>>>>>
> >>>>>>>>>>>> please review and if you feel they make sense, please merge
> >>> it
> >>>> to
> >>>>>>> main
> >>>>>>>>>>>> branch.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks
> >>>>>>>>>>>> Sachin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
> >
> >
>
>

Re: Fixing two critical bugs in kafka streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for your input. I now understood the first issue (and the fix).
Still not sure about the second issue.

From my understanding, the deadlock is "caused" by your fix of problem
one. If the thread would die, the lock would get release and no deadlock
would occur.

However, because the thread does not die, it also does not release its
locks. Thus, from my understanding, in case of a CommitFailedException,
the thread must release its locks, too. This would resolve the deadlock
-- we don't need any timeout for this.

Or do I still miss understand the issue?


-Matthias


On 3/6/17 11:41 AM, Guozhang Wang wrote:
> Hello Sachin,
> 
> Thanks for your finds!! Just to add what Damian said regarding 1), in
> KIP-129 where we are introducing exactly-once processing semantics to
> Streams we have also described different categories of error handling for
> exactly-once. Commit exceptions due to rebalance will be handled as
> "producer fenced" which will not be thrown to the users as we already did
> in trunk.
> 
> As for the second issue you brought up, I agree it is indeed a bug; but
> just to clarify it is the CREATION of the first task including restoring
> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> processing it right (we are only start processing until all tasks have been
> created and initialized)?
> 
> 
> 
> Guozhang
> 
> 
> On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sj...@gmail.com> wrote:
> 
>> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>>
>>
>> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <da...@gmail.com> wrote:
>>
>>> Hi Sachin,
>>>
>>> If it is a bug then please file a JIRA for it, too.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>>> Ok that's great.
>>>> So you have already fixed that issue.
>>>>
>>>> I have modified my PR to remove that change (which was done keeping
>>>> 0.10.2.0 in mind).
>>>>
>>>> However the other issue is still valid.
>>>>
>>>> Please review that change. https://github.com/apache/kafka/pull/2642
>>>>
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com>
>> wrote:
>>>>
>>>>> On trunk the CommitFailedException isn't thrown anymore. The
>>>> commitOffsets
>>>>> method doesn't throw an exception. It returns one if it was thrown.
>> We
>>>> used
>>>>> to throw this exception during suspendTasksAndState, but we don't
>>>> anymore.
>>>>>
>>>>> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com>
>> wrote:
>>>>>
>>>>>> Hi
>>>>>> On CommitFailedException at onPartitionsRevoked if it is thrown it
>>> gets
>>>>>> assigned to rebalanceException.
>>>>>> This causes the stream thread to shutdown. I am not sure how we can
>>>>> resume
>>>>>> the thread.
>>>>>>
>>>>>> Note thread is not in invalid state because because it already has
>>> been
>>>>>> assigned new partitions and this exception happens when trying to
>>>> revoke
>>>>>> old partitions which have been moved to some other thread, so we
>> need
>>>> to
>>>>>> swallow this exception at the StreanThread side too, just like we
>>>> swallow
>>>>>> it at ConsumerCoordinator.java
>>>>>>
>>>>>> Also I fixed this against code base 0.10.2.0 and the difference in
>>> that
>>>>> vs
>>>>>> trunk code is these lines
>>>>>> 10.2.0
>>>>>>        if (firstException.get() == null) {
>>>>>>             firstException.set(commitOffsets());
>>>>>>        }
>>>>>>  vs trunk
>>>>>>         if (firstException.get() == null) {
>>>>>>             // TODO: currently commit failures will not be thrown
>> to
>>>>> users
>>>>>>             // while suspending tasks; this need to be re-visit
>> after
>>>>>> KIP-98
>>>>>>             commitOffsets();
>>>>>>         }
>>>>>> I am again not sure since this part is still a TODO, but looking at
>>>> code
>>>>> I
>>>>>> see that commitOffsets can still throw the CommitFailedException
>>> which
>>>>>> needs to be handled at onPartitionsRevoked.
>>>>>>
>>>>>> Hope this makes sense.
>>>>>>
>>>>>> On second issue, the deadlock is not caused by
>> CommitFailedExceptio,
>>>> but
>>>>>> after fixing the deadlock we need to make sure thread does not die
>>> due
>>>> to
>>>>>> unhandled CommitFailedException at onPartitionsRevoked.
>>>>>> The deadlock issue is like this.
>>>>>> If a thread has two partitions and while processing partition one
>> it
>>>>> takes
>>>>>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
>>> evicted
>>>>>> from the group and both partitions are now migrated to some other
>>>> thread.
>>>>>> Now when it tries to process the partition two it tries to get the
>>> lock
>>>>> to
>>>>>> rocks db. It won't get the lock since that partition is now moved
>> to
>>>> some
>>>>>> other thread. So it keeps increasing the backoffTimeMs and keeps
>>> trying
>>>>> to
>>>>>> get the lock forever. This reaching a deadlock.
>>>>>> To fix this we need some upper bound of the time limit till it
>> tries
>>> to
>>>>> get
>>>>>> that lock. And that upper bound has to be
>>> MAX_POLL_INTERVAL_MS_CONFIG,
>>>>>> because if by that time it has not got the lock, we can see that
>> this
>>>>>> thread was evicted from the group and need to rejoin again to get
>> new
>>>>>> partitions.
>>>>>>
>>>>>> On JIRA issue I can create one and attach the part of logs where it
>>>> keeps
>>>>>> trying to get the lock with increasing backoffTimeM.
>>>>>>
>>>>>> Let me know if these makes sense. Right now this is the best way we
>>>> could
>>>>>> come up with to handle stream thread failures.
>>>>>>
>>>>>> Also on a side note I feel we need more resilient streams. If we
>> have
>>>> say
>>>>>> configured our streams application with 4 threads and for whatever
>>>>> reason a
>>>>>> thread dies, then application should itself (or via some exposed
>>>> hooks),
>>>>>> allow to restart a new thread (because in Java I guess same thread
>>>> cannot
>>>>>> be restarted), so that number of threads always stay what one has
>>>>>> configured.
>>>>>> I think exposed hooks will be better option to do this.
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
>>> matthias@confluent.io
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Sachin,
>>>>>>>
>>>>>>> thanks a lot for contributing!
>>>>>>>
>>>>>>> Right now, I am not sure if I understand the change. On
>>>>>>> CommitFailedException, why can we just resume the thread? To me,
>> it
>>>>>>> seems that the thread will be in an invalid state and thus it's
>> not
>>>>> save
>>>>>>> to just swallow the exception and keep going. Can you shed some
>>>> light?
>>>>>>>
>>>>>>> And from my understanding, the deadlock is "caused" by the change
>>>> from
>>>>>>> above, right? So if it is save to swallow the exception, we
>> should
>>> do
>>>>>>> some "clean up" to avoid the deadlock in the first place, instead
>>> of
>>>>>>> applying and additional timeout.
>>>>>>>
>>>>>>> Also, if this is a bug, we should have a JIRA.
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 3/5/17 4:11 AM, Sachin Mittal wrote:
>>>>>>>> Hi,
>>>>>>>> Please find the new PR
>>>>>>>> https://github.com/apache/kafka/pull/2642/
>>>>>>>>
>>>>>>>> I see that in trunk there has been change which is different
>> from
>>>> in
>>>>>>> 10.2.0
>>>>>>>>
>>>>>>>> 10.2.0
>>>>>>>>        if (firstException.get() == null) {
>>>>>>>>             firstException.set(commitOffsets());
>>>>>>>>        }
>>>>>>>>  vs trunk
>>>>>>>>         if (firstException.get() == null) {
>>>>>>>>             // TODO: currently commit failures will not be
>> thrown
>>>> to
>>>>>>> users
>>>>>>>>             // while suspending tasks; this need to be re-visit
>>>> after
>>>>>>> KIP-98
>>>>>>>>             commitOffsets();
>>>>>>>>         }
>>>>>>>> I am not sure in view of this is is my part of the fix still
>>> valid.
>>>>>> Looks
>>>>>>>> like it is still valid.
>>>>>>>>
>>>>>>>> Also on side note what is the policy of closing a branch that
>> is
>>>> just
>>>>>>>> released.
>>>>>>>>
>>>>>>>> Since you have release 10.2.0 we are using that and that is why
>>>> have
>>>>>> made
>>>>>>>> changes in that branch so that our changes just modify the
>> needed
>>>>> code
>>>>>>> and
>>>>>>>> we don't mess up the other released code.
>>>>>>>>
>>>>>>>> Is the new release released off the branch 10.2.0, if yes then
>>> you
>>>>>> should
>>>>>>>> not close it as there can be patch fixes on them.
>>>>>>>>
>>>>>>>> Or is the release always made off the branch trunk. In that
>> case
>>>> how
>>>>>> can
>>>>>>> we
>>>>>>>> pick up the code on which the release binaries were created so
>>> when
>>>>> we
>>>>>>>> build the binary we have exactly same code as released one,
>> plus
>>>> any
>>>>>>>> changes (we or someone else) makes on it.
>>>>>>>>
>>>>>>>> Also if a branch is closed, then perhaps we should delete it or
>>>> mark
>>>>> it
>>>>>>>> closed or something.
>>>>>>>>
>>>>>>>> Please let us know how releases get created (off what
>> codebase),
>>> so
>>>>> we
>>>>>>> are
>>>>>>>> more exact in applying our changes to.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Sachin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
>>>> eno.thereska@gmail.com
>>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Sachin, one thing before the review, 0.10.2 is closed
>>> now,
>>>>> this
>>>>>>>>> needs to target trunk.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Please review the PR and let me know if this makes sense.
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/kafka/pull/2640
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Sachin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
>>>>> eno.thereska@gmail.com
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Sachin for your contribution. Could you create a pull
>>>>> request
>>>>>>> out
>>>>>>>>>>> of the commit (so we can add comments, and also so you are
>>>>>>> acknowledged
>>>>>>>>>>> properly for your contribution)?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com
>>>
>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> So far in our experiment we have encountered 2 critical
>> bugs.
>>>>>>>>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
>> to
>>>>>>> compute a
>>>>>>>>>>>> cycle it gets evicted from group and rebalance takes place
>>> and
>>>> it
>>>>>>> gets
>>>>>>>>>>> new
>>>>>>>>>>>> assignment.
>>>>>>>>>>>> However when this thread tries to commit offsets for the
>>>> revoked
>>>>>>>>>>> partitions
>>>>>>>>>>>> in
>>>>>>>>>>>> onPartitionsRevoked it will again throw the
>>>>> CommitFailedException.
>>>>>>>>>>>>
>>>>>>>>>>>> This gets handled by ConsumerCoordinatorso there is no
>> point
>>> to
>>>>>>> assign
>>>>>>>>>>> this
>>>>>>>>>>>> exception to
>>>>>>>>>>>> rebalanceException in StreamThread and stop it. It has
>>> already
>>>>> been
>>>>>>>>>>>> assigned new partitions and it can continue.
>>>>>>>>>>>>
>>>>>>>>>>>> So as fix in case on CommitFailedException I am not killing
>>> the
>>>>>>>>>>> StreamThrea.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Next we see a deadlock state when to process a task it
>>> takes
>>>>>>> longer
>>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG
>>>>>>>>>>>> time. Then this threads partitions are assigned to some
>> other
>>>>>> thread
>>>>>>>>>>>> including rocksdb lock. When it tries to process the next
>>> task
>>>> it
>>>>>>>>> cannot
>>>>>>>>>>>> get rocks db lock and simply keeps waiting for that lock
>>>> forever.
>>>>>>>>>>>>
>>>>>>>>>>>> in retryWithBackoff for AbstractTaskCreator we have a
>>>>>> backoffTimeMs =
>>>>>>>>>>> 50L.
>>>>>>>>>>>> If it does not get lock the we simply increase the time by
>>> 10x
>>>>> and
>>>>>>> keep
>>>>>>>>>>>> trying inside the while true loop.
>>>>>>>>>>>>
>>>>>>>>>>>> We need to have a upper bound for this backoffTimeM. If the
>>>> time
>>>>> is
>>>>>>>>>>> greater
>>>>>>>>>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got
>> the
>>>>> lock
>>>>>>>>> means
>>>>>>>>>>>> this thread's partitions are moved somewhere else and it
>> may
>>>> not
>>>>>> get
>>>>>>>>> the
>>>>>>>>>>>> lock again.
>>>>>>>>>>>>
>>>>>>>>>>>> So I have added an upper bound check in that while loop.
>>>>>>>>>>>>
>>>>>>>>>>>> The commits are here:
>>>>>>>>>>>> https://github.com/sjmittal/kafka/commit/
>>>>>>>>> 6f04327c890c58cab9b1ae108af4ce
>>>>>>>>>>> 5c4e3b89a1
>>>>>>>>>>>>
>>>>>>>>>>>> please review and if you feel they make sense, please merge
>>> it
>>>> to
>>>>>>> main
>>>>>>>>>>>> branch.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Sachin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 
> 
> 


Re: Fixing two critical bugs in kafka streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sachin,

Thanks for your finds!! Just to add what Damian said regarding 1), in
KIP-129 where we are introducing exactly-once processing semantics to
Streams we have also described different categories of error handling for
exactly-once. Commit exceptions due to rebalance will be handled as
"producer fenced" which will not be thrown to the users as we already did
in trunk.

As for the second issue you brought up, I agree it is indeed a bug; but
just to clarify it is the CREATION of the first task including restoring
stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
processing it right (we are only start processing until all tasks have been
created and initialized)?



Guozhang


On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sj...@gmail.com> wrote:

> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>
>
> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > If it is a bug then please file a JIRA for it, too.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Ok that's great.
> > > So you have already fixed that issue.
> > >
> > > I have modified my PR to remove that change (which was done keeping
> > > 0.10.2.0 in mind).
> > >
> > > However the other issue is still valid.
> > >
> > > Please review that change. https://github.com/apache/kafka/pull/2642
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > On trunk the CommitFailedException isn't thrown anymore. The
> > > commitOffsets
> > > > method doesn't throw an exception. It returns one if it was thrown.
> We
> > > used
> > > > to throw this exception during suspendTasksAndState, but we don't
> > > anymore.
> > > >
> > > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com>
> wrote:
> > > >
> > > > > Hi
> > > > > On CommitFailedException at onPartitionsRevoked if it is thrown it
> > gets
> > > > > assigned to rebalanceException.
> > > > > This causes the stream thread to shutdown. I am not sure how we can
> > > > resume
> > > > > the thread.
> > > > >
> > > > > Note thread is not in invalid state because because it already has
> > been
> > > > > assigned new partitions and this exception happens when trying to
> > > revoke
> > > > > old partitions which have been moved to some other thread, so we
> need
> > > to
> > > > > swallow this exception at the StreanThread side too, just like we
> > > swallow
> > > > > it at ConsumerCoordinator.java
> > > > >
> > > > > Also I fixed this against code base 0.10.2.0 and the difference in
> > that
> > > > vs
> > > > > trunk code is these lines
> > > > > 10.2.0
> > > > >        if (firstException.get() == null) {
> > > > >             firstException.set(commitOffsets());
> > > > >        }
> > > > >  vs trunk
> > > > >         if (firstException.get() == null) {
> > > > >             // TODO: currently commit failures will not be thrown
> to
> > > > users
> > > > >             // while suspending tasks; this need to be re-visit
> after
> > > > > KIP-98
> > > > >             commitOffsets();
> > > > >         }
> > > > > I am again not sure since this part is still a TODO, but looking at
> > > code
> > > > I
> > > > > see that commitOffsets can still throw the CommitFailedException
> > which
> > > > > needs to be handled at onPartitionsRevoked.
> > > > >
> > > > > Hope this makes sense.
> > > > >
> > > > > On second issue, the deadlock is not caused by
> CommitFailedExceptio,
> > > but
> > > > > after fixing the deadlock we need to make sure thread does not die
> > due
> > > to
> > > > > unhandled CommitFailedException at onPartitionsRevoked.
> > > > > The deadlock issue is like this.
> > > > > If a thread has two partitions and while processing partition one
> it
> > > > takes
> > > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> > evicted
> > > > > from the group and both partitions are now migrated to some other
> > > thread.
> > > > > Now when it tries to process the partition two it tries to get the
> > lock
> > > > to
> > > > > rocks db. It won't get the lock since that partition is now moved
> to
> > > some
> > > > > other thread. So it keeps increasing the backoffTimeMs and keeps
> > trying
> > > > to
> > > > > get the lock forever. This reaching a deadlock.
> > > > > To fix this we need some upper bound of the time limit till it
> tries
> > to
> > > > get
> > > > > that lock. And that upper bound has to be
> > MAX_POLL_INTERVAL_MS_CONFIG,
> > > > > because if by that time it has not got the lock, we can see that
> this
> > > > > thread was evicted from the group and need to rejoin again to get
> new
> > > > > partitions.
> > > > >
> > > > > On JIRA issue I can create one and attach the part of logs where it
> > > keeps
> > > > > trying to get the lock with increasing backoffTimeM.
> > > > >
> > > > > Let me know if these makes sense. Right now this is the best way we
> > > could
> > > > > come up with to handle stream thread failures.
> > > > >
> > > > > Also on a side note I feel we need more resilient streams. If we
> have
> > > say
> > > > > configured our streams application with 4 threads and for whatever
> > > > reason a
> > > > > thread dies, then application should itself (or via some exposed
> > > hooks),
> > > > > allow to restart a new thread (because in Java I guess same thread
> > > cannot
> > > > > be restarted), so that number of threads always stay what one has
> > > > > configured.
> > > > > I think exposed hooks will be better option to do this.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > > > Sachin,
> > > > > >
> > > > > > thanks a lot for contributing!
> > > > > >
> > > > > > Right now, I am not sure if I understand the change. On
> > > > > > CommitFailedException, why can we just resume the thread? To me,
> it
> > > > > > seems that the thread will be in an invalid state and thus it's
> not
> > > > save
> > > > > > to just swallow the exception and keep going. Can you shed some
> > > light?
> > > > > >
> > > > > > And from my understanding, the deadlock is "caused" by the change
> > > from
> > > > > > above, right? So if it is save to swallow the exception, we
> should
> > do
> > > > > > some "clean up" to avoid the deadlock in the first place, instead
> > of
> > > > > > applying and additional timeout.
> > > > > >
> > > > > > Also, if this is a bug, we should have a JIRA.
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > > > > Hi,
> > > > > > > Please find the new PR
> > > > > > > https://github.com/apache/kafka/pull/2642/
> > > > > > >
> > > > > > > I see that in trunk there has been change which is different
> from
> > > in
> > > > > > 10.2.0
> > > > > > >
> > > > > > > 10.2.0
> > > > > > >        if (firstException.get() == null) {
> > > > > > >             firstException.set(commitOffsets());
> > > > > > >        }
> > > > > > >  vs trunk
> > > > > > >         if (firstException.get() == null) {
> > > > > > >             // TODO: currently commit failures will not be
> thrown
> > > to
> > > > > > users
> > > > > > >             // while suspending tasks; this need to be re-visit
> > > after
> > > > > > KIP-98
> > > > > > >             commitOffsets();
> > > > > > >         }
> > > > > > > I am not sure in view of this is is my part of the fix still
> > valid.
> > > > > Looks
> > > > > > > like it is still valid.
> > > > > > >
> > > > > > > Also on side note what is the policy of closing a branch that
> is
> > > just
> > > > > > > released.
> > > > > > >
> > > > > > > Since you have release 10.2.0 we are using that and that is why
> > > have
> > > > > made
> > > > > > > changes in that branch so that our changes just modify the
> needed
> > > > code
> > > > > > and
> > > > > > > we don't mess up the other released code.
> > > > > > >
> > > > > > > Is the new release released off the branch 10.2.0, if yes then
> > you
> > > > > should
> > > > > > > not close it as there can be patch fixes on them.
> > > > > > >
> > > > > > > Or is the release always made off the branch trunk. In that
> case
> > > how
> > > > > can
> > > > > > we
> > > > > > > pick up the code on which the release binaries were created so
> > when
> > > > we
> > > > > > > build the binary we have exactly same code as released one,
> plus
> > > any
> > > > > > > changes (we or someone else) makes on it.
> > > > > > >
> > > > > > > Also if a branch is closed, then perhaps we should delete it or
> > > mark
> > > > it
> > > > > > > closed or something.
> > > > > > >
> > > > > > > Please let us know how releases get created (off what
> codebase),
> > so
> > > > we
> > > > > > are
> > > > > > > more exact in applying our changes to.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> > > eno.thereska@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed
> > now,
> > > > this
> > > > > > >> needs to target trunk.
> > > > > > >>
> > > > > > >> Thanks
> > > > > > >> Eno
> > > > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
> > > wrote:
> > > > > > >>>
> > > > > > >>> Please review the PR and let me know if this makes sense.
> > > > > > >>>
> > > > > > >>> https://github.com/apache/kafka/pull/2640
> > > > > > >>>
> > > > > > >>> Thanks
> > > > > > >>> Sachin
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> > > > eno.thereska@gmail.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >>>
> > > > > > >>>> Thanks Sachin for your contribution. Could you create a pull
> > > > request
> > > > > > out
> > > > > > >>>> of the commit (so we can add comments, and also so you are
> > > > > > acknowledged
> > > > > > >>>> properly for your contribution)?
> > > > > > >>>>
> > > > > > >>>> Thanks
> > > > > > >>>> Eno
> > > > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com
> >
> > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>> Hi,
> > > > > > >>>>> So far in our experiment we have encountered 2 critical
> bugs.
> > > > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
> to
> > > > > > compute a
> > > > > > >>>>> cycle it gets evicted from group and rebalance takes place
> > and
> > > it
> > > > > > gets
> > > > > > >>>> new
> > > > > > >>>>> assignment.
> > > > > > >>>>> However when this thread tries to commit offsets for the
> > > revoked
> > > > > > >>>> partitions
> > > > > > >>>>> in
> > > > > > >>>>> onPartitionsRevoked it will again throw the
> > > > CommitFailedException.
> > > > > > >>>>>
> > > > > > >>>>> This gets handled by ConsumerCoordinatorso there is no
> point
> > to
> > > > > > assign
> > > > > > >>>> this
> > > > > > >>>>> exception to
> > > > > > >>>>> rebalanceException in StreamThread and stop it. It has
> > already
> > > > been
> > > > > > >>>>> assigned new partitions and it can continue.
> > > > > > >>>>>
> > > > > > >>>>> So as fix in case on CommitFailedException I am not killing
> > the
> > > > > > >>>> StreamThrea.
> > > > > > >>>>>
> > > > > > >>>>> 2. Next we see a deadlock state when to process a task it
> > takes
> > > > > > longer
> > > > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > > > > >>>>> time. Then this threads partitions are assigned to some
> other
> > > > > thread
> > > > > > >>>>> including rocksdb lock. When it tries to process the next
> > task
> > > it
> > > > > > >> cannot
> > > > > > >>>>> get rocks db lock and simply keeps waiting for that lock
> > > forever.
> > > > > > >>>>>
> > > > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > > > > backoffTimeMs =
> > > > > > >>>> 50L.
> > > > > > >>>>> If it does not get lock the we simply increase the time by
> > 10x
> > > > and
> > > > > > keep
> > > > > > >>>>> trying inside the while true loop.
> > > > > > >>>>>
> > > > > > >>>>> We need to have a upper bound for this backoffTimeM. If the
> > > time
> > > > is
> > > > > > >>>> greater
> > > > > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got
> the
> > > > lock
> > > > > > >> means
> > > > > > >>>>> this thread's partitions are moved somewhere else and it
> may
> > > not
> > > > > get
> > > > > > >> the
> > > > > > >>>>> lock again.
> > > > > > >>>>>
> > > > > > >>>>> So I have added an upper bound check in that while loop.
> > > > > > >>>>>
> > > > > > >>>>> The commits are here:
> > > > > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > > > > >> 6f04327c890c58cab9b1ae108af4ce
> > > > > > >>>> 5c4e3b89a1
> > > > > > >>>>>
> > > > > > >>>>> please review and if you feel they make sense, please merge
> > it
> > > to
> > > > > > main
> > > > > > >>>>> branch.
> > > > > > >>>>>
> > > > > > >>>>> Thanks
> > > > > > >>>>> Sachin
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848


On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> If it is a bug then please file a JIRA for it, too.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Ok that's great.
> > So you have already fixed that issue.
> >
> > I have modified my PR to remove that change (which was done keeping
> > 0.10.2.0 in mind).
> >
> > However the other issue is still valid.
> >
> > Please review that change. https://github.com/apache/kafka/pull/2642
> >
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > On trunk the CommitFailedException isn't thrown anymore. The
> > commitOffsets
> > > method doesn't throw an exception. It returns one if it was thrown. We
> > used
> > > to throw this exception during suspendTasksAndState, but we don't
> > anymore.
> > >
> > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > > Hi
> > > > On CommitFailedException at onPartitionsRevoked if it is thrown it
> gets
> > > > assigned to rebalanceException.
> > > > This causes the stream thread to shutdown. I am not sure how we can
> > > resume
> > > > the thread.
> > > >
> > > > Note thread is not in invalid state because because it already has
> been
> > > > assigned new partitions and this exception happens when trying to
> > revoke
> > > > old partitions which have been moved to some other thread, so we need
> > to
> > > > swallow this exception at the StreanThread side too, just like we
> > swallow
> > > > it at ConsumerCoordinator.java
> > > >
> > > > Also I fixed this against code base 0.10.2.0 and the difference in
> that
> > > vs
> > > > trunk code is these lines
> > > > 10.2.0
> > > >        if (firstException.get() == null) {
> > > >             firstException.set(commitOffsets());
> > > >        }
> > > >  vs trunk
> > > >         if (firstException.get() == null) {
> > > >             // TODO: currently commit failures will not be thrown to
> > > users
> > > >             // while suspending tasks; this need to be re-visit after
> > > > KIP-98
> > > >             commitOffsets();
> > > >         }
> > > > I am again not sure since this part is still a TODO, but looking at
> > code
> > > I
> > > > see that commitOffsets can still throw the CommitFailedException
> which
> > > > needs to be handled at onPartitionsRevoked.
> > > >
> > > > Hope this makes sense.
> > > >
> > > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> > but
> > > > after fixing the deadlock we need to make sure thread does not die
> due
> > to
> > > > unhandled CommitFailedException at onPartitionsRevoked.
> > > > The deadlock issue is like this.
> > > > If a thread has two partitions and while processing partition one it
> > > takes
> > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> evicted
> > > > from the group and both partitions are now migrated to some other
> > thread.
> > > > Now when it tries to process the partition two it tries to get the
> lock
> > > to
> > > > rocks db. It won't get the lock since that partition is now moved to
> > some
> > > > other thread. So it keeps increasing the backoffTimeMs and keeps
> trying
> > > to
> > > > get the lock forever. This reaching a deadlock.
> > > > To fix this we need some upper bound of the time limit till it tries
> to
> > > get
> > > > that lock. And that upper bound has to be
> MAX_POLL_INTERVAL_MS_CONFIG,
> > > > because if by that time it has not got the lock, we can see that this
> > > > thread was evicted from the group and need to rejoin again to get new
> > > > partitions.
> > > >
> > > > On JIRA issue I can create one and attach the part of logs where it
> > keeps
> > > > trying to get the lock with increasing backoffTimeM.
> > > >
> > > > Let me know if these makes sense. Right now this is the best way we
> > could
> > > > come up with to handle stream thread failures.
> > > >
> > > > Also on a side note I feel we need more resilient streams. If we have
> > say
> > > > configured our streams application with 4 threads and for whatever
> > > reason a
> > > > thread dies, then application should itself (or via some exposed
> > hooks),
> > > > allow to restart a new thread (because in Java I guess same thread
> > cannot
> > > > be restarted), so that number of threads always stay what one has
> > > > configured.
> > > > I think exposed hooks will be better option to do this.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > Sachin,
> > > > >
> > > > > thanks a lot for contributing!
> > > > >
> > > > > Right now, I am not sure if I understand the change. On
> > > > > CommitFailedException, why can we just resume the thread? To me, it
> > > > > seems that the thread will be in an invalid state and thus it's not
> > > save
> > > > > to just swallow the exception and keep going. Can you shed some
> > light?
> > > > >
> > > > > And from my understanding, the deadlock is "caused" by the change
> > from
> > > > > above, right? So if it is save to swallow the exception, we should
> do
> > > > > some "clean up" to avoid the deadlock in the first place, instead
> of
> > > > > applying and additional timeout.
> > > > >
> > > > > Also, if this is a bug, we should have a JIRA.
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > > > Hi,
> > > > > > Please find the new PR
> > > > > > https://github.com/apache/kafka/pull/2642/
> > > > > >
> > > > > > I see that in trunk there has been change which is different from
> > in
> > > > > 10.2.0
> > > > > >
> > > > > > 10.2.0
> > > > > >        if (firstException.get() == null) {
> > > > > >             firstException.set(commitOffsets());
> > > > > >        }
> > > > > >  vs trunk
> > > > > >         if (firstException.get() == null) {
> > > > > >             // TODO: currently commit failures will not be thrown
> > to
> > > > > users
> > > > > >             // while suspending tasks; this need to be re-visit
> > after
> > > > > KIP-98
> > > > > >             commitOffsets();
> > > > > >         }
> > > > > > I am not sure in view of this is is my part of the fix still
> valid.
> > > > Looks
> > > > > > like it is still valid.
> > > > > >
> > > > > > Also on side note what is the policy of closing a branch that is
> > just
> > > > > > released.
> > > > > >
> > > > > > Since you have release 10.2.0 we are using that and that is why
> > have
> > > > made
> > > > > > changes in that branch so that our changes just modify the needed
> > > code
> > > > > and
> > > > > > we don't mess up the other released code.
> > > > > >
> > > > > > Is the new release released off the branch 10.2.0, if yes then
> you
> > > > should
> > > > > > not close it as there can be patch fixes on them.
> > > > > >
> > > > > > Or is the release always made off the branch trunk. In that case
> > how
> > > > can
> > > > > we
> > > > > > pick up the code on which the release binaries were created so
> when
> > > we
> > > > > > build the binary we have exactly same code as released one, plus
> > any
> > > > > > changes (we or someone else) makes on it.
> > > > > >
> > > > > > Also if a branch is closed, then perhaps we should delete it or
> > mark
> > > it
> > > > > > closed or something.
> > > > > >
> > > > > > Please let us know how releases get created (off what codebase),
> so
> > > we
> > > > > are
> > > > > > more exact in applying our changes to.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed
> now,
> > > this
> > > > > >> needs to target trunk.
> > > > > >>
> > > > > >> Thanks
> > > > > >> Eno
> > > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Please review the PR and let me know if this makes sense.
> > > > > >>>
> > > > > >>> https://github.com/apache/kafka/pull/2640
> > > > > >>>
> > > > > >>> Thanks
> > > > > >>> Sachin
> > > > > >>>
> > > > > >>>
> > > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> > > eno.thereska@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Thanks Sachin for your contribution. Could you create a pull
> > > request
> > > > > out
> > > > > >>>> of the commit (so we can add comments, and also so you are
> > > > > acknowledged
> > > > > >>>> properly for your contribution)?
> > > > > >>>>
> > > > > >>>> Thanks
> > > > > >>>> Eno
> > > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com>
> > > wrote:
> > > > > >>>>>
> > > > > >>>>> Hi,
> > > > > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > > > > compute a
> > > > > >>>>> cycle it gets evicted from group and rebalance takes place
> and
> > it
> > > > > gets
> > > > > >>>> new
> > > > > >>>>> assignment.
> > > > > >>>>> However when this thread tries to commit offsets for the
> > revoked
> > > > > >>>> partitions
> > > > > >>>>> in
> > > > > >>>>> onPartitionsRevoked it will again throw the
> > > CommitFailedException.
> > > > > >>>>>
> > > > > >>>>> This gets handled by ConsumerCoordinatorso there is no point
> to
> > > > > assign
> > > > > >>>> this
> > > > > >>>>> exception to
> > > > > >>>>> rebalanceException in StreamThread and stop it. It has
> already
> > > been
> > > > > >>>>> assigned new partitions and it can continue.
> > > > > >>>>>
> > > > > >>>>> So as fix in case on CommitFailedException I am not killing
> the
> > > > > >>>> StreamThrea.
> > > > > >>>>>
> > > > > >>>>> 2. Next we see a deadlock state when to process a task it
> takes
> > > > > longer
> > > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > > > >>>>> time. Then this threads partitions are assigned to some other
> > > > thread
> > > > > >>>>> including rocksdb lock. When it tries to process the next
> task
> > it
> > > > > >> cannot
> > > > > >>>>> get rocks db lock and simply keeps waiting for that lock
> > forever.
> > > > > >>>>>
> > > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > > > backoffTimeMs =
> > > > > >>>> 50L.
> > > > > >>>>> If it does not get lock the we simply increase the time by
> 10x
> > > and
> > > > > keep
> > > > > >>>>> trying inside the while true loop.
> > > > > >>>>>
> > > > > >>>>> We need to have a upper bound for this backoffTimeM. If the
> > time
> > > is
> > > > > >>>> greater
> > > > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the
> > > lock
> > > > > >> means
> > > > > >>>>> this thread's partitions are moved somewhere else and it may
> > not
> > > > get
> > > > > >> the
> > > > > >>>>> lock again.
> > > > > >>>>>
> > > > > >>>>> So I have added an upper bound check in that while loop.
> > > > > >>>>>
> > > > > >>>>> The commits are here:
> > > > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > > > >> 6f04327c890c58cab9b1ae108af4ce
> > > > > >>>> 5c4e3b89a1
> > > > > >>>>>
> > > > > >>>>> please review and if you feel they make sense, please merge
> it
> > to
> > > > > main
> > > > > >>>>> branch.
> > > > > >>>>>
> > > > > >>>>> Thanks
> > > > > >>>>> Sachin
> > > > > >>>>
> > > > > >>>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Fixing two critical bugs in kafka streams

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

If it is a bug then please file a JIRA for it, too.

Thanks,
Damian

On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sj...@gmail.com> wrote:

> Ok that's great.
> So you have already fixed that issue.
>
> I have modified my PR to remove that change (which was done keeping
> 0.10.2.0 in mind).
>
> However the other issue is still valid.
>
> Please review that change. https://github.com/apache/kafka/pull/2642
>
>
> Thanks
> Sachin
>
>
> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com> wrote:
>
> > On trunk the CommitFailedException isn't thrown anymore. The
> commitOffsets
> > method doesn't throw an exception. It returns one if it was thrown. We
> used
> > to throw this exception during suspendTasksAndState, but we don't
> anymore.
> >
> > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Hi
> > > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > > assigned to rebalanceException.
> > > This causes the stream thread to shutdown. I am not sure how we can
> > resume
> > > the thread.
> > >
> > > Note thread is not in invalid state because because it already has been
> > > assigned new partitions and this exception happens when trying to
> revoke
> > > old partitions which have been moved to some other thread, so we need
> to
> > > swallow this exception at the StreanThread side too, just like we
> swallow
> > > it at ConsumerCoordinator.java
> > >
> > > Also I fixed this against code base 0.10.2.0 and the difference in that
> > vs
> > > trunk code is these lines
> > > 10.2.0
> > >        if (firstException.get() == null) {
> > >             firstException.set(commitOffsets());
> > >        }
> > >  vs trunk
> > >         if (firstException.get() == null) {
> > >             // TODO: currently commit failures will not be thrown to
> > users
> > >             // while suspending tasks; this need to be re-visit after
> > > KIP-98
> > >             commitOffsets();
> > >         }
> > > I am again not sure since this part is still a TODO, but looking at
> code
> > I
> > > see that commitOffsets can still throw the CommitFailedException which
> > > needs to be handled at onPartitionsRevoked.
> > >
> > > Hope this makes sense.
> > >
> > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> but
> > > after fixing the deadlock we need to make sure thread does not die due
> to
> > > unhandled CommitFailedException at onPartitionsRevoked.
> > > The deadlock issue is like this.
> > > If a thread has two partitions and while processing partition one it
> > takes
> > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > > from the group and both partitions are now migrated to some other
> thread.
> > > Now when it tries to process the partition two it tries to get the lock
> > to
> > > rocks db. It won't get the lock since that partition is now moved to
> some
> > > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> > to
> > > get the lock forever. This reaching a deadlock.
> > > To fix this we need some upper bound of the time limit till it tries to
> > get
> > > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > > because if by that time it has not got the lock, we can see that this
> > > thread was evicted from the group and need to rejoin again to get new
> > > partitions.
> > >
> > > On JIRA issue I can create one and attach the part of logs where it
> keeps
> > > trying to get the lock with increasing backoffTimeM.
> > >
> > > Let me know if these makes sense. Right now this is the best way we
> could
> > > come up with to handle stream thread failures.
> > >
> > > Also on a side note I feel we need more resilient streams. If we have
> say
> > > configured our streams application with 4 threads and for whatever
> > reason a
> > > thread dies, then application should itself (or via some exposed
> hooks),
> > > allow to restart a new thread (because in Java I guess same thread
> cannot
> > > be restarted), so that number of threads always stay what one has
> > > configured.
> > > I think exposed hooks will be better option to do this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > > > Sachin,
> > > >
> > > > thanks a lot for contributing!
> > > >
> > > > Right now, I am not sure if I understand the change. On
> > > > CommitFailedException, why can we just resume the thread? To me, it
> > > > seems that the thread will be in an invalid state and thus it's not
> > save
> > > > to just swallow the exception and keep going. Can you shed some
> light?
> > > >
> > > > And from my understanding, the deadlock is "caused" by the change
> from
> > > > above, right? So if it is save to swallow the exception, we should do
> > > > some "clean up" to avoid the deadlock in the first place, instead of
> > > > applying and additional timeout.
> > > >
> > > > Also, if this is a bug, we should have a JIRA.
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > Please find the new PR
> > > > > https://github.com/apache/kafka/pull/2642/
> > > > >
> > > > > I see that in trunk there has been change which is different from
> in
> > > > 10.2.0
> > > > >
> > > > > 10.2.0
> > > > >        if (firstException.get() == null) {
> > > > >             firstException.set(commitOffsets());
> > > > >        }
> > > > >  vs trunk
> > > > >         if (firstException.get() == null) {
> > > > >             // TODO: currently commit failures will not be thrown
> to
> > > > users
> > > > >             // while suspending tasks; this need to be re-visit
> after
> > > > KIP-98
> > > > >             commitOffsets();
> > > > >         }
> > > > > I am not sure in view of this is is my part of the fix still valid.
> > > Looks
> > > > > like it is still valid.
> > > > >
> > > > > Also on side note what is the policy of closing a branch that is
> just
> > > > > released.
> > > > >
> > > > > Since you have release 10.2.0 we are using that and that is why
> have
> > > made
> > > > > changes in that branch so that our changes just modify the needed
> > code
> > > > and
> > > > > we don't mess up the other released code.
> > > > >
> > > > > Is the new release released off the branch 10.2.0, if yes then you
> > > should
> > > > > not close it as there can be patch fixes on them.
> > > > >
> > > > > Or is the release always made off the branch trunk. In that case
> how
> > > can
> > > > we
> > > > > pick up the code on which the release binaries were created so when
> > we
> > > > > build the binary we have exactly same code as released one, plus
> any
> > > > > changes (we or someone else) makes on it.
> > > > >
> > > > > Also if a branch is closed, then perhaps we should delete it or
> mark
> > it
> > > > > closed or something.
> > > > >
> > > > > Please let us know how releases get created (off what codebase), so
> > we
> > > > are
> > > > > more exact in applying our changes to.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> eno.thereska@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now,
> > this
> > > > >> needs to target trunk.
> > > > >>
> > > > >> Thanks
> > > > >> Eno
> > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com>
> wrote:
> > > > >>>
> > > > >>> Please review the PR and let me know if this makes sense.
> > > > >>>
> > > > >>> https://github.com/apache/kafka/pull/2640
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>>
> > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Thanks Sachin for your contribution. Could you create a pull
> > request
> > > > out
> > > > >>>> of the commit (so we can add comments, and also so you are
> > > > acknowledged
> > > > >>>> properly for your contribution)?
> > > > >>>>
> > > > >>>> Thanks
> > > > >>>> Eno
> > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com>
> > wrote:
> > > > >>>>>
> > > > >>>>> Hi,
> > > > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > > > compute a
> > > > >>>>> cycle it gets evicted from group and rebalance takes place and
> it
> > > > gets
> > > > >>>> new
> > > > >>>>> assignment.
> > > > >>>>> However when this thread tries to commit offsets for the
> revoked
> > > > >>>> partitions
> > > > >>>>> in
> > > > >>>>> onPartitionsRevoked it will again throw the
> > CommitFailedException.
> > > > >>>>>
> > > > >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> > > > assign
> > > > >>>> this
> > > > >>>>> exception to
> > > > >>>>> rebalanceException in StreamThread and stop it. It has already
> > been
> > > > >>>>> assigned new partitions and it can continue.
> > > > >>>>>
> > > > >>>>> So as fix in case on CommitFailedException I am not killing the
> > > > >>>> StreamThrea.
> > > > >>>>>
> > > > >>>>> 2. Next we see a deadlock state when to process a task it takes
> > > > longer
> > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > > >>>>> time. Then this threads partitions are assigned to some other
> > > thread
> > > > >>>>> including rocksdb lock. When it tries to process the next task
> it
> > > > >> cannot
> > > > >>>>> get rocks db lock and simply keeps waiting for that lock
> forever.
> > > > >>>>>
> > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > > backoffTimeMs =
> > > > >>>> 50L.
> > > > >>>>> If it does not get lock the we simply increase the time by 10x
> > and
> > > > keep
> > > > >>>>> trying inside the while true loop.
> > > > >>>>>
> > > > >>>>> We need to have a upper bound for this backoffTimeM. If the
> time
> > is
> > > > >>>> greater
> > > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the
> > lock
> > > > >> means
> > > > >>>>> this thread's partitions are moved somewhere else and it may
> not
> > > get
> > > > >> the
> > > > >>>>> lock again.
> > > > >>>>>
> > > > >>>>> So I have added an upper bound check in that while loop.
> > > > >>>>>
> > > > >>>>> The commits are here:
> > > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > > >> 6f04327c890c58cab9b1ae108af4ce
> > > > >>>> 5c4e3b89a1
> > > > >>>>>
> > > > >>>>> please review and if you feel they make sense, please merge it
> to
> > > > main
> > > > >>>>> branch.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>
> > > > >>>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Ok that's great.
So you have already fixed that issue.

I have modified my PR to remove that change (which was done keeping
0.10.2.0 in mind).

However the other issue is still valid.

Please review that change. https://github.com/apache/kafka/pull/2642


Thanks
Sachin


On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <da...@gmail.com> wrote:

> On trunk the CommitFailedException isn't thrown anymore. The commitOffsets
> method doesn't throw an exception. It returns one if it was thrown. We used
> to throw this exception during suspendTasksAndState, but we don't anymore.
>
> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi
> > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > assigned to rebalanceException.
> > This causes the stream thread to shutdown. I am not sure how we can
> resume
> > the thread.
> >
> > Note thread is not in invalid state because because it already has been
> > assigned new partitions and this exception happens when trying to revoke
> > old partitions which have been moved to some other thread, so we need to
> > swallow this exception at the StreanThread side too, just like we swallow
> > it at ConsumerCoordinator.java
> >
> > Also I fixed this against code base 0.10.2.0 and the difference in that
> vs
> > trunk code is these lines
> > 10.2.0
> >        if (firstException.get() == null) {
> >             firstException.set(commitOffsets());
> >        }
> >  vs trunk
> >         if (firstException.get() == null) {
> >             // TODO: currently commit failures will not be thrown to
> users
> >             // while suspending tasks; this need to be re-visit after
> > KIP-98
> >             commitOffsets();
> >         }
> > I am again not sure since this part is still a TODO, but looking at code
> I
> > see that commitOffsets can still throw the CommitFailedException which
> > needs to be handled at onPartitionsRevoked.
> >
> > Hope this makes sense.
> >
> > On second issue, the deadlock is not caused by CommitFailedExceptio, but
> > after fixing the deadlock we need to make sure thread does not die due to
> > unhandled CommitFailedException at onPartitionsRevoked.
> > The deadlock issue is like this.
> > If a thread has two partitions and while processing partition one it
> takes
> > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > from the group and both partitions are now migrated to some other thread.
> > Now when it tries to process the partition two it tries to get the lock
> to
> > rocks db. It won't get the lock since that partition is now moved to some
> > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> to
> > get the lock forever. This reaching a deadlock.
> > To fix this we need some upper bound of the time limit till it tries to
> get
> > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > because if by that time it has not got the lock, we can see that this
> > thread was evicted from the group and need to rejoin again to get new
> > partitions.
> >
> > On JIRA issue I can create one and attach the part of logs where it keeps
> > trying to get the lock with increasing backoffTimeM.
> >
> > Let me know if these makes sense. Right now this is the best way we could
> > come up with to handle stream thread failures.
> >
> > Also on a side note I feel we need more resilient streams. If we have say
> > configured our streams application with 4 threads and for whatever
> reason a
> > thread dies, then application should itself (or via some exposed hooks),
> > allow to restart a new thread (because in Java I guess same thread cannot
> > be restarted), so that number of threads always stay what one has
> > configured.
> > I think exposed hooks will be better option to do this.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Sachin,
> > >
> > > thanks a lot for contributing!
> > >
> > > Right now, I am not sure if I understand the change. On
> > > CommitFailedException, why can we just resume the thread? To me, it
> > > seems that the thread will be in an invalid state and thus it's not
> save
> > > to just swallow the exception and keep going. Can you shed some light?
> > >
> > > And from my understanding, the deadlock is "caused" by the change from
> > > above, right? So if it is save to swallow the exception, we should do
> > > some "clean up" to avoid the deadlock in the first place, instead of
> > > applying and additional timeout.
> > >
> > > Also, if this is a bug, we should have a JIRA.
> > >
> > > -Matthias
> > >
> > >
> > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > Hi,
> > > > Please find the new PR
> > > > https://github.com/apache/kafka/pull/2642/
> > > >
> > > > I see that in trunk there has been change which is different from in
> > > 10.2.0
> > > >
> > > > 10.2.0
> > > >        if (firstException.get() == null) {
> > > >             firstException.set(commitOffsets());
> > > >        }
> > > >  vs trunk
> > > >         if (firstException.get() == null) {
> > > >             // TODO: currently commit failures will not be thrown to
> > > users
> > > >             // while suspending tasks; this need to be re-visit after
> > > KIP-98
> > > >             commitOffsets();
> > > >         }
> > > > I am not sure in view of this is is my part of the fix still valid.
> > Looks
> > > > like it is still valid.
> > > >
> > > > Also on side note what is the policy of closing a branch that is just
> > > > released.
> > > >
> > > > Since you have release 10.2.0 we are using that and that is why have
> > made
> > > > changes in that branch so that our changes just modify the needed
> code
> > > and
> > > > we don't mess up the other released code.
> > > >
> > > > Is the new release released off the branch 10.2.0, if yes then you
> > should
> > > > not close it as there can be patch fixes on them.
> > > >
> > > > Or is the release always made off the branch trunk. In that case how
> > can
> > > we
> > > > pick up the code on which the release binaries were created so when
> we
> > > > build the binary we have exactly same code as released one, plus any
> > > > changes (we or someone else) makes on it.
> > > >
> > > > Also if a branch is closed, then perhaps we should delete it or mark
> it
> > > > closed or something.
> > > >
> > > > Please let us know how releases get created (off what codebase), so
> we
> > > are
> > > > more exact in applying our changes to.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <eno.thereska@gmail.com
> >
> > > wrote:
> > > >
> > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now,
> this
> > > >> needs to target trunk.
> > > >>
> > > >> Thanks
> > > >> Eno
> > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
> > > >>>
> > > >>> Please review the PR and let me know if this makes sense.
> > > >>>
> > > >>> https://github.com/apache/kafka/pull/2640
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> eno.thereska@gmail.com
> > >
> > > >> wrote:
> > > >>>
> > > >>>> Thanks Sachin for your contribution. Could you create a pull
> request
> > > out
> > > >>>> of the commit (so we can add comments, and also so you are
> > > acknowledged
> > > >>>> properly for your contribution)?
> > > >>>>
> > > >>>> Thanks
> > > >>>> Eno
> > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com>
> wrote:
> > > >>>>>
> > > >>>>> Hi,
> > > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > > compute a
> > > >>>>> cycle it gets evicted from group and rebalance takes place and it
> > > gets
> > > >>>> new
> > > >>>>> assignment.
> > > >>>>> However when this thread tries to commit offsets for the revoked
> > > >>>> partitions
> > > >>>>> in
> > > >>>>> onPartitionsRevoked it will again throw the
> CommitFailedException.
> > > >>>>>
> > > >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> > > assign
> > > >>>> this
> > > >>>>> exception to
> > > >>>>> rebalanceException in StreamThread and stop it. It has already
> been
> > > >>>>> assigned new partitions and it can continue.
> > > >>>>>
> > > >>>>> So as fix in case on CommitFailedException I am not killing the
> > > >>>> StreamThrea.
> > > >>>>>
> > > >>>>> 2. Next we see a deadlock state when to process a task it takes
> > > longer
> > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > >>>>> time. Then this threads partitions are assigned to some other
> > thread
> > > >>>>> including rocksdb lock. When it tries to process the next task it
> > > >> cannot
> > > >>>>> get rocks db lock and simply keeps waiting for that lock forever.
> > > >>>>>
> > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > backoffTimeMs =
> > > >>>> 50L.
> > > >>>>> If it does not get lock the we simply increase the time by 10x
> and
> > > keep
> > > >>>>> trying inside the while true loop.
> > > >>>>>
> > > >>>>> We need to have a upper bound for this backoffTimeM. If the time
> is
> > > >>>> greater
> > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the
> lock
> > > >> means
> > > >>>>> this thread's partitions are moved somewhere else and it may not
> > get
> > > >> the
> > > >>>>> lock again.
> > > >>>>>
> > > >>>>> So I have added an upper bound check in that while loop.
> > > >>>>>
> > > >>>>> The commits are here:
> > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > >> 6f04327c890c58cab9b1ae108af4ce
> > > >>>> 5c4e3b89a1
> > > >>>>>
> > > >>>>> please review and if you feel they make sense, please merge it to
> > > main
> > > >>>>> branch.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Sachin
> > > >>>>
> > > >>>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Re: Fixing two critical bugs in kafka streams

Posted by Damian Guy <da...@gmail.com>.
On trunk the CommitFailedException isn't thrown anymore. The commitOffsets
method doesn't throw an exception. It returns one if it was thrown. We used
to throw this exception during suspendTasksAndState, but we don't anymore.

On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sj...@gmail.com> wrote:

> Hi
> On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> assigned to rebalanceException.
> This causes the stream thread to shutdown. I am not sure how we can resume
> the thread.
>
> Note thread is not in invalid state because because it already has been
> assigned new partitions and this exception happens when trying to revoke
> old partitions which have been moved to some other thread, so we need to
> swallow this exception at the StreanThread side too, just like we swallow
> it at ConsumerCoordinator.java
>
> Also I fixed this against code base 0.10.2.0 and the difference in that vs
> trunk code is these lines
> 10.2.0
>        if (firstException.get() == null) {
>             firstException.set(commitOffsets());
>        }
>  vs trunk
>         if (firstException.get() == null) {
>             // TODO: currently commit failures will not be thrown to users
>             // while suspending tasks; this need to be re-visit after
> KIP-98
>             commitOffsets();
>         }
> I am again not sure since this part is still a TODO, but looking at code I
> see that commitOffsets can still throw the CommitFailedException which
> needs to be handled at onPartitionsRevoked.
>
> Hope this makes sense.
>
> On second issue, the deadlock is not caused by CommitFailedExceptio, but
> after fixing the deadlock we need to make sure thread does not die due to
> unhandled CommitFailedException at onPartitionsRevoked.
> The deadlock issue is like this.
> If a thread has two partitions and while processing partition one it takes
> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> from the group and both partitions are now migrated to some other thread.
> Now when it tries to process the partition two it tries to get the lock to
> rocks db. It won't get the lock since that partition is now moved to some
> other thread. So it keeps increasing the backoffTimeMs and keeps trying to
> get the lock forever. This reaching a deadlock.
> To fix this we need some upper bound of the time limit till it tries to get
> that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> because if by that time it has not got the lock, we can see that this
> thread was evicted from the group and need to rejoin again to get new
> partitions.
>
> On JIRA issue I can create one and attach the part of logs where it keeps
> trying to get the lock with increasing backoffTimeM.
>
> Let me know if these makes sense. Right now this is the best way we could
> come up with to handle stream thread failures.
>
> Also on a side note I feel we need more resilient streams. If we have say
> configured our streams application with 4 threads and for whatever reason a
> thread dies, then application should itself (or via some exposed hooks),
> allow to restart a new thread (because in Java I guess same thread cannot
> be restarted), so that number of threads always stay what one has
> configured.
> I think exposed hooks will be better option to do this.
>
> Thanks
> Sachin
>
>
>
>
> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Sachin,
> >
> > thanks a lot for contributing!
> >
> > Right now, I am not sure if I understand the change. On
> > CommitFailedException, why can we just resume the thread? To me, it
> > seems that the thread will be in an invalid state and thus it's not save
> > to just swallow the exception and keep going. Can you shed some light?
> >
> > And from my understanding, the deadlock is "caused" by the change from
> > above, right? So if it is save to swallow the exception, we should do
> > some "clean up" to avoid the deadlock in the first place, instead of
> > applying and additional timeout.
> >
> > Also, if this is a bug, we should have a JIRA.
> >
> > -Matthias
> >
> >
> > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > Hi,
> > > Please find the new PR
> > > https://github.com/apache/kafka/pull/2642/
> > >
> > > I see that in trunk there has been change which is different from in
> > 10.2.0
> > >
> > > 10.2.0
> > >        if (firstException.get() == null) {
> > >             firstException.set(commitOffsets());
> > >        }
> > >  vs trunk
> > >         if (firstException.get() == null) {
> > >             // TODO: currently commit failures will not be thrown to
> > users
> > >             // while suspending tasks; this need to be re-visit after
> > KIP-98
> > >             commitOffsets();
> > >         }
> > > I am not sure in view of this is is my part of the fix still valid.
> Looks
> > > like it is still valid.
> > >
> > > Also on side note what is the policy of closing a branch that is just
> > > released.
> > >
> > > Since you have release 10.2.0 we are using that and that is why have
> made
> > > changes in that branch so that our changes just modify the needed code
> > and
> > > we don't mess up the other released code.
> > >
> > > Is the new release released off the branch 10.2.0, if yes then you
> should
> > > not close it as there can be patch fixes on them.
> > >
> > > Or is the release always made off the branch trunk. In that case how
> can
> > we
> > > pick up the code on which the release binaries were created so when we
> > > build the binary we have exactly same code as released one, plus any
> > > changes (we or someone else) makes on it.
> > >
> > > Also if a branch is closed, then perhaps we should delete it or mark it
> > > closed or something.
> > >
> > > Please let us know how releases get created (off what codebase), so we
> > are
> > > more exact in applying our changes to.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> > >
> > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now, this
> > >> needs to target trunk.
> > >>
> > >> Thanks
> > >> Eno
> > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
> > >>>
> > >>> Please review the PR and let me know if this makes sense.
> > >>>
> > >>> https://github.com/apache/kafka/pull/2640
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <eno.thereska@gmail.com
> >
> > >> wrote:
> > >>>
> > >>>> Thanks Sachin for your contribution. Could you create a pull request
> > out
> > >>>> of the commit (so we can add comments, and also so you are
> > acknowledged
> > >>>> properly for your contribution)?
> > >>>>
> > >>>> Thanks
> > >>>> Eno
> > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
> > >>>>>
> > >>>>> Hi,
> > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > compute a
> > >>>>> cycle it gets evicted from group and rebalance takes place and it
> > gets
> > >>>> new
> > >>>>> assignment.
> > >>>>> However when this thread tries to commit offsets for the revoked
> > >>>> partitions
> > >>>>> in
> > >>>>> onPartitionsRevoked it will again throw the CommitFailedException.
> > >>>>>
> > >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> > assign
> > >>>> this
> > >>>>> exception to
> > >>>>> rebalanceException in StreamThread and stop it. It has already been
> > >>>>> assigned new partitions and it can continue.
> > >>>>>
> > >>>>> So as fix in case on CommitFailedException I am not killing the
> > >>>> StreamThrea.
> > >>>>>
> > >>>>> 2. Next we see a deadlock state when to process a task it takes
> > longer
> > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > >>>>> time. Then this threads partitions are assigned to some other
> thread
> > >>>>> including rocksdb lock. When it tries to process the next task it
> > >> cannot
> > >>>>> get rocks db lock and simply keeps waiting for that lock forever.
> > >>>>>
> > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> backoffTimeMs =
> > >>>> 50L.
> > >>>>> If it does not get lock the we simply increase the time by 10x and
> > keep
> > >>>>> trying inside the while true loop.
> > >>>>>
> > >>>>> We need to have a upper bound for this backoffTimeM. If the time is
> > >>>> greater
> > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock
> > >> means
> > >>>>> this thread's partitions are moved somewhere else and it may not
> get
> > >> the
> > >>>>> lock again.
> > >>>>>
> > >>>>> So I have added an upper bound check in that while loop.
> > >>>>>
> > >>>>> The commits are here:
> > >>>>> https://github.com/sjmittal/kafka/commit/
> > >> 6f04327c890c58cab9b1ae108af4ce
> > >>>> 5c4e3b89a1
> > >>>>>
> > >>>>> please review and if you feel they make sense, please merge it to
> > main
> > >>>>> branch.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Sachin
> > >>>>
> > >>>>
> > >>
> > >>
> > >
> >
> >
>

Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Hi
On CommitFailedException at onPartitionsRevoked if it is thrown it gets
assigned to rebalanceException.
This causes the stream thread to shutdown. I am not sure how we can resume
the thread.

Note thread is not in invalid state because because it already has been
assigned new partitions and this exception happens when trying to revoke
old partitions which have been moved to some other thread, so we need to
swallow this exception at the StreanThread side too, just like we swallow
it at ConsumerCoordinator.java

Also I fixed this against code base 0.10.2.0 and the difference in that vs
trunk code is these lines
10.2.0
       if (firstException.get() == null) {
            firstException.set(commitOffsets());
       }
 vs trunk
        if (firstException.get() == null) {
            // TODO: currently commit failures will not be thrown to users
            // while suspending tasks; this need to be re-visit after KIP-98
            commitOffsets();
        }
I am again not sure since this part is still a TODO, but looking at code I
see that commitOffsets can still throw the CommitFailedException which
needs to be handled at onPartitionsRevoked.

Hope this makes sense.

On second issue, the deadlock is not caused by CommitFailedExceptio, but
after fixing the deadlock we need to make sure thread does not die due to
unhandled CommitFailedException at onPartitionsRevoked.
The deadlock issue is like this.
If a thread has two partitions and while processing partition one it takes
more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
from the group and both partitions are now migrated to some other thread.
Now when it tries to process the partition two it tries to get the lock to
rocks db. It won't get the lock since that partition is now moved to some
other thread. So it keeps increasing the backoffTimeMs and keeps trying to
get the lock forever. This reaching a deadlock.
To fix this we need some upper bound of the time limit till it tries to get
that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
because if by that time it has not got the lock, we can see that this
thread was evicted from the group and need to rejoin again to get new
partitions.

On JIRA issue I can create one and attach the part of logs where it keeps
trying to get the lock with increasing backoffTimeM.

Let me know if these makes sense. Right now this is the best way we could
come up with to handle stream thread failures.

Also on a side note I feel we need more resilient streams. If we have say
configured our streams application with 4 threads and for whatever reason a
thread dies, then application should itself (or via some exposed hooks),
allow to restart a new thread (because in Java I guess same thread cannot
be restarted), so that number of threads always stay what one has
configured.
I think exposed hooks will be better option to do this.

Thanks
Sachin




On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Sachin,
>
> thanks a lot for contributing!
>
> Right now, I am not sure if I understand the change. On
> CommitFailedException, why can we just resume the thread? To me, it
> seems that the thread will be in an invalid state and thus it's not save
> to just swallow the exception and keep going. Can you shed some light?
>
> And from my understanding, the deadlock is "caused" by the change from
> above, right? So if it is save to swallow the exception, we should do
> some "clean up" to avoid the deadlock in the first place, instead of
> applying and additional timeout.
>
> Also, if this is a bug, we should have a JIRA.
>
> -Matthias
>
>
> On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > Hi,
> > Please find the new PR
> > https://github.com/apache/kafka/pull/2642/
> >
> > I see that in trunk there has been change which is different from in
> 10.2.0
> >
> > 10.2.0
> >        if (firstException.get() == null) {
> >             firstException.set(commitOffsets());
> >        }
> >  vs trunk
> >         if (firstException.get() == null) {
> >             // TODO: currently commit failures will not be thrown to
> users
> >             // while suspending tasks; this need to be re-visit after
> KIP-98
> >             commitOffsets();
> >         }
> > I am not sure in view of this is is my part of the fix still valid. Looks
> > like it is still valid.
> >
> > Also on side note what is the policy of closing a branch that is just
> > released.
> >
> > Since you have release 10.2.0 we are using that and that is why have made
> > changes in that branch so that our changes just modify the needed code
> and
> > we don't mess up the other released code.
> >
> > Is the new release released off the branch 10.2.0, if yes then you should
> > not close it as there can be patch fixes on them.
> >
> > Or is the release always made off the branch trunk. In that case how can
> we
> > pick up the code on which the release binaries were created so when we
> > build the binary we have exactly same code as released one, plus any
> > changes (we or someone else) makes on it.
> >
> > Also if a branch is closed, then perhaps we should delete it or mark it
> > closed or something.
> >
> > Please let us know how releases get created (off what codebase), so we
> are
> > more exact in applying our changes to.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <en...@gmail.com>
> wrote:
> >
> >> Thanks Sachin, one thing before the review, 0.10.2 is closed now, this
> >> needs to target trunk.
> >>
> >> Thanks
> >> Eno
> >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Please review the PR and let me know if this makes sense.
> >>>
> >>> https://github.com/apache/kafka/pull/2640
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <en...@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Sachin for your contribution. Could you create a pull request
> out
> >>>> of the commit (so we can add comments, and also so you are
> acknowledged
> >>>> properly for your contribution)?
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
> >>>>>
> >>>>> Hi,
> >>>>> So far in our experiment we have encountered 2 critical bugs.
> >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> compute a
> >>>>> cycle it gets evicted from group and rebalance takes place and it
> gets
> >>>> new
> >>>>> assignment.
> >>>>> However when this thread tries to commit offsets for the revoked
> >>>> partitions
> >>>>> in
> >>>>> onPartitionsRevoked it will again throw the CommitFailedException.
> >>>>>
> >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> assign
> >>>> this
> >>>>> exception to
> >>>>> rebalanceException in StreamThread and stop it. It has already been
> >>>>> assigned new partitions and it can continue.
> >>>>>
> >>>>> So as fix in case on CommitFailedException I am not killing the
> >>>> StreamThrea.
> >>>>>
> >>>>> 2. Next we see a deadlock state when to process a task it takes
> longer
> >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> >>>>> time. Then this threads partitions are assigned to some other thread
> >>>>> including rocksdb lock. When it tries to process the next task it
> >> cannot
> >>>>> get rocks db lock and simply keeps waiting for that lock forever.
> >>>>>
> >>>>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs =
> >>>> 50L.
> >>>>> If it does not get lock the we simply increase the time by 10x and
> keep
> >>>>> trying inside the while true loop.
> >>>>>
> >>>>> We need to have a upper bound for this backoffTimeM. If the time is
> >>>> greater
> >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock
> >> means
> >>>>> this thread's partitions are moved somewhere else and it may not get
> >> the
> >>>>> lock again.
> >>>>>
> >>>>> So I have added an upper bound check in that while loop.
> >>>>>
> >>>>> The commits are here:
> >>>>> https://github.com/sjmittal/kafka/commit/
> >> 6f04327c890c58cab9b1ae108af4ce
> >>>> 5c4e3b89a1
> >>>>>
> >>>>> please review and if you feel they make sense, please merge it to
> main
> >>>>> branch.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>
> >>>>
> >>
> >>
> >
>
>

Re: Fixing two critical bugs in kafka streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sachin,

thanks a lot for contributing!

Right now, I am not sure if I understand the change. On
CommitFailedException, why can we just resume the thread? To me, it
seems that the thread will be in an invalid state and thus it's not save
to just swallow the exception and keep going. Can you shed some light?

And from my understanding, the deadlock is "caused" by the change from
above, right? So if it is save to swallow the exception, we should do
some "clean up" to avoid the deadlock in the first place, instead of
applying and additional timeout.

Also, if this is a bug, we should have a JIRA.

-Matthias


On 3/5/17 4:11 AM, Sachin Mittal wrote:
> Hi,
> Please find the new PR
> https://github.com/apache/kafka/pull/2642/
> 
> I see that in trunk there has been change which is different from in 10.2.0
> 
> 10.2.0
>        if (firstException.get() == null) {
>             firstException.set(commitOffsets());
>        }
>  vs trunk
>         if (firstException.get() == null) {
>             // TODO: currently commit failures will not be thrown to users
>             // while suspending tasks; this need to be re-visit after KIP-98
>             commitOffsets();
>         }
> I am not sure in view of this is is my part of the fix still valid. Looks
> like it is still valid.
> 
> Also on side note what is the policy of closing a branch that is just
> released.
> 
> Since you have release 10.2.0 we are using that and that is why have made
> changes in that branch so that our changes just modify the needed code and
> we don't mess up the other released code.
> 
> Is the new release released off the branch 10.2.0, if yes then you should
> not close it as there can be patch fixes on them.
> 
> Or is the release always made off the branch trunk. In that case how can we
> pick up the code on which the release binaries were created so when we
> build the binary we have exactly same code as released one, plus any
> changes (we or someone else) makes on it.
> 
> Also if a branch is closed, then perhaps we should delete it or mark it
> closed or something.
> 
> Please let us know how releases get created (off what codebase), so we are
> more exact in applying our changes to.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Thanks Sachin, one thing before the review, 0.10.2 is closed now, this
>> needs to target trunk.
>>
>> Thanks
>> Eno
>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>> Please review the PR and let me know if this makes sense.
>>>
>>> https://github.com/apache/kafka/pull/2640
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <en...@gmail.com>
>> wrote:
>>>
>>>> Thanks Sachin for your contribution. Could you create a pull request out
>>>> of the commit (so we can add comments, and also so you are acknowledged
>>>> properly for your contribution)?
>>>>
>>>> Thanks
>>>> Eno
>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>> So far in our experiment we have encountered 2 critical bugs.
>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
>>>>> cycle it gets evicted from group and rebalance takes place and it gets
>>>> new
>>>>> assignment.
>>>>> However when this thread tries to commit offsets for the revoked
>>>> partitions
>>>>> in
>>>>> onPartitionsRevoked it will again throw the CommitFailedException.
>>>>>
>>>>> This gets handled by ConsumerCoordinatorso there is no point to assign
>>>> this
>>>>> exception to
>>>>> rebalanceException in StreamThread and stop it. It has already been
>>>>> assigned new partitions and it can continue.
>>>>>
>>>>> So as fix in case on CommitFailedException I am not killing the
>>>> StreamThrea.
>>>>>
>>>>> 2. Next we see a deadlock state when to process a task it takes longer
>>>>> than MAX_POLL_INTERVAL_MS_CONFIG
>>>>> time. Then this threads partitions are assigned to some other thread
>>>>> including rocksdb lock. When it tries to process the next task it
>> cannot
>>>>> get rocks db lock and simply keeps waiting for that lock forever.
>>>>>
>>>>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs =
>>>> 50L.
>>>>> If it does not get lock the we simply increase the time by 10x and keep
>>>>> trying inside the while true loop.
>>>>>
>>>>> We need to have a upper bound for this backoffTimeM. If the time is
>>>> greater
>>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock
>> means
>>>>> this thread's partitions are moved somewhere else and it may not get
>> the
>>>>> lock again.
>>>>>
>>>>> So I have added an upper bound check in that while loop.
>>>>>
>>>>> The commits are here:
>>>>> https://github.com/sjmittal/kafka/commit/
>> 6f04327c890c58cab9b1ae108af4ce
>>>> 5c4e3b89a1
>>>>>
>>>>> please review and if you feel they make sense, please merge it to main
>>>>> branch.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>
>>>>
>>
>>
> 


Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
Please find the new PR
https://github.com/apache/kafka/pull/2642/

I see that in trunk there has been change which is different from in 10.2.0

10.2.0
       if (firstException.get() == null) {
            firstException.set(commitOffsets());
       }
 vs trunk
        if (firstException.get() == null) {
            // TODO: currently commit failures will not be thrown to users
            // while suspending tasks; this need to be re-visit after KIP-98
            commitOffsets();
        }
I am not sure in view of this is is my part of the fix still valid. Looks
like it is still valid.

Also on side note what is the policy of closing a branch that is just
released.

Since you have release 10.2.0 we are using that and that is why have made
changes in that branch so that our changes just modify the needed code and
we don't mess up the other released code.

Is the new release released off the branch 10.2.0, if yes then you should
not close it as there can be patch fixes on them.

Or is the release always made off the branch trunk. In that case how can we
pick up the code on which the release binaries were created so when we
build the binary we have exactly same code as released one, plus any
changes (we or someone else) makes on it.

Also if a branch is closed, then perhaps we should delete it or mark it
closed or something.

Please let us know how releases get created (off what codebase), so we are
more exact in applying our changes to.

Thanks
Sachin



On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <en...@gmail.com> wrote:

> Thanks Sachin, one thing before the review, 0.10.2 is closed now, this
> needs to target trunk.
>
> Thanks
> Eno
> > On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Please review the PR and let me know if this makes sense.
> >
> > https://github.com/apache/kafka/pull/2640
> >
> > Thanks
> > Sachin
> >
> >
> > On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <en...@gmail.com>
> wrote:
> >
> >> Thanks Sachin for your contribution. Could you create a pull request out
> >> of the commit (so we can add comments, and also so you are acknowledged
> >> properly for your contribution)?
> >>
> >> Thanks
> >> Eno
> >>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>> So far in our experiment we have encountered 2 critical bugs.
> >>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
> >>> cycle it gets evicted from group and rebalance takes place and it gets
> >> new
> >>> assignment.
> >>> However when this thread tries to commit offsets for the revoked
> >> partitions
> >>> in
> >>> onPartitionsRevoked it will again throw the CommitFailedException.
> >>>
> >>> This gets handled by ConsumerCoordinatorso there is no point to assign
> >> this
> >>> exception to
> >>> rebalanceException in StreamThread and stop it. It has already been
> >>> assigned new partitions and it can continue.
> >>>
> >>> So as fix in case on CommitFailedException I am not killing the
> >> StreamThrea.
> >>>
> >>> 2. Next we see a deadlock state when to process a task it takes longer
> >>> than MAX_POLL_INTERVAL_MS_CONFIG
> >>> time. Then this threads partitions are assigned to some other thread
> >>> including rocksdb lock. When it tries to process the next task it
> cannot
> >>> get rocks db lock and simply keeps waiting for that lock forever.
> >>>
> >>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs =
> >> 50L.
> >>> If it does not get lock the we simply increase the time by 10x and keep
> >>> trying inside the while true loop.
> >>>
> >>> We need to have a upper bound for this backoffTimeM. If the time is
> >> greater
> >>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock
> means
> >>> this thread's partitions are moved somewhere else and it may not get
> the
> >>> lock again.
> >>>
> >>> So I have added an upper bound check in that while loop.
> >>>
> >>> The commits are here:
> >>> https://github.com/sjmittal/kafka/commit/
> 6f04327c890c58cab9b1ae108af4ce
> >> 5c4e3b89a1
> >>>
> >>> please review and if you feel they make sense, please merge it to main
> >>> branch.
> >>>
> >>> Thanks
> >>> Sachin
> >>
> >>
>
>

Re: Fixing two critical bugs in kafka streams

Posted by Eno Thereska <en...@gmail.com>.
Thanks Sachin, one thing before the review, 0.10.2 is closed now, this needs to target trunk.

Thanks
Eno
> On 5 Mar 2017, at 09:10, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Please review the PR and let me know if this makes sense.
> 
> https://github.com/apache/kafka/pull/2640
> 
> Thanks
> Sachin
> 
> 
> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <en...@gmail.com> wrote:
> 
>> Thanks Sachin for your contribution. Could you create a pull request out
>> of the commit (so we can add comments, and also so you are acknowledged
>> properly for your contribution)?
>> 
>> Thanks
>> Eno
>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Hi,
>>> So far in our experiment we have encountered 2 critical bugs.
>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
>>> cycle it gets evicted from group and rebalance takes place and it gets
>> new
>>> assignment.
>>> However when this thread tries to commit offsets for the revoked
>> partitions
>>> in
>>> onPartitionsRevoked it will again throw the CommitFailedException.
>>> 
>>> This gets handled by ConsumerCoordinatorso there is no point to assign
>> this
>>> exception to
>>> rebalanceException in StreamThread and stop it. It has already been
>>> assigned new partitions and it can continue.
>>> 
>>> So as fix in case on CommitFailedException I am not killing the
>> StreamThrea.
>>> 
>>> 2. Next we see a deadlock state when to process a task it takes longer
>>> than MAX_POLL_INTERVAL_MS_CONFIG
>>> time. Then this threads partitions are assigned to some other thread
>>> including rocksdb lock. When it tries to process the next task it cannot
>>> get rocks db lock and simply keeps waiting for that lock forever.
>>> 
>>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs =
>> 50L.
>>> If it does not get lock the we simply increase the time by 10x and keep
>>> trying inside the while true loop.
>>> 
>>> We need to have a upper bound for this backoffTimeM. If the time is
>> greater
>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means
>>> this thread's partitions are moved somewhere else and it may not get the
>>> lock again.
>>> 
>>> So I have added an upper bound check in that while loop.
>>> 
>>> The commits are here:
>>> https://github.com/sjmittal/kafka/commit/6f04327c890c58cab9b1ae108af4ce
>> 5c4e3b89a1
>>> 
>>> please review and if you feel they make sense, please merge it to main
>>> branch.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 


Re: Fixing two critical bugs in kafka streams

Posted by Sachin Mittal <sj...@gmail.com>.
Please review the PR and let me know if this makes sense.

https://github.com/apache/kafka/pull/2640

Thanks
Sachin


On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <en...@gmail.com> wrote:

> Thanks Sachin for your contribution. Could you create a pull request out
> of the commit (so we can add comments, and also so you are acknowledged
> properly for your contribution)?
>
> Thanks
> Eno
> > On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi,
> > So far in our experiment we have encountered 2 critical bugs.
> > 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
> > cycle it gets evicted from group and rebalance takes place and it gets
> new
> > assignment.
> > However when this thread tries to commit offsets for the revoked
> partitions
> > in
> > onPartitionsRevoked it will again throw the CommitFailedException.
> >
> > This gets handled by ConsumerCoordinatorso there is no point to assign
> this
> > exception to
> > rebalanceException in StreamThread and stop it. It has already been
> > assigned new partitions and it can continue.
> >
> > So as fix in case on CommitFailedException I am not killing the
> StreamThrea.
> >
> > 2. Next we see a deadlock state when to process a task it takes longer
> > than MAX_POLL_INTERVAL_MS_CONFIG
> > time. Then this threads partitions are assigned to some other thread
> > including rocksdb lock. When it tries to process the next task it cannot
> > get rocks db lock and simply keeps waiting for that lock forever.
> >
> > in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs =
> 50L.
> > If it does not get lock the we simply increase the time by 10x and keep
> > trying inside the while true loop.
> >
> > We need to have a upper bound for this backoffTimeM. If the time is
> greater
> > than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means
> > this thread's partitions are moved somewhere else and it may not get the
> > lock again.
> >
> > So I have added an upper bound check in that while loop.
> >
> > The commits are here:
> > https://github.com/sjmittal/kafka/commit/6f04327c890c58cab9b1ae108af4ce
> 5c4e3b89a1
> >
> > please review and if you feel they make sense, please merge it to main
> > branch.
> >
> > Thanks
> > Sachin
>
>

Re: Fixing two critical bugs in kafka streams

Posted by Eno Thereska <en...@gmail.com>.
Thanks Sachin for your contribution. Could you create a pull request out of the commit (so we can add comments, and also so you are acknowledged properly for your contribution)? 

Thanks
Eno
> On 5 Mar 2017, at 07:34, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi,
> So far in our experiment we have encountered 2 critical bugs.
> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a
> cycle it gets evicted from group and rebalance takes place and it gets new
> assignment.
> However when this thread tries to commit offsets for the revoked partitions
> in
> onPartitionsRevoked it will again throw the CommitFailedException.
> 
> This gets handled by ConsumerCoordinatorso there is no point to assign this
> exception to
> rebalanceException in StreamThread and stop it. It has already been
> assigned new partitions and it can continue.
> 
> So as fix in case on CommitFailedException I am not killing the StreamThrea.
> 
> 2. Next we see a deadlock state when to process a task it takes longer
> than MAX_POLL_INTERVAL_MS_CONFIG
> time. Then this threads partitions are assigned to some other thread
> including rocksdb lock. When it tries to process the next task it cannot
> get rocks db lock and simply keeps waiting for that lock forever.
> 
> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
> If it does not get lock the we simply increase the time by 10x and keep
> trying inside the while true loop.
> 
> We need to have a upper bound for this backoffTimeM. If the time is greater
> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means
> this thread's partitions are moved somewhere else and it may not get the
> lock again.
> 
> So I have added an upper bound check in that while loop.
> 
> The commits are here:
> https://github.com/sjmittal/kafka/commit/6f04327c890c58cab9b1ae108af4ce5c4e3b89a1
> 
> please review and if you feel they make sense, please merge it to main
> branch.
> 
> Thanks
> Sachin