You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Navinder Brar <na...@yahoo.com.INVALID> on 2019/11/14 21:00:29 UTC

Potential Bug in 2.3 version (leading to deletion of state directories)

Hi,
We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
Can someone please comment on this behavior.
Thanks,Navinder

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Did we create tickets?

On 11/15/19 9:32 AM, John Roesler wrote:
> Hi Navinder and Giridhar,
> 
> Thanks for the clarification. I think the motivation is reasonable.
> Although, I'm still a little unsure how this condition occurs. In
> addition to what I mentioned before, I'd forgotten one other important
> detail, that the "cleaner thread" should only run when the
> KafkaStreams state is RUNNING. That is, it shouldn't be able to
> "steal" a state directory while a rebalance is ongoing. Perhaps what
> is happening is that the cleaner checks the state before the
> rebalance, then the rebalance starts while the cleaner is iterating
> over the stores, so the task happens to unlock the directory just
> before the cleaner locks it?
> 
> Entry point, in KafkaStreams:
> 
> stateDirCleaner.scheduleAtFixedRate(() -> {
>     // we do not use lock here since we only read on the value and act on it
>     if (state == State.RUNNING) {
>         stateDirectory.cleanRemovedTasks(cleanupDelay);
>     }
> }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
> 
> Perhaps instead of checking the state of Streams _before_ the looping
> over the tasks, we should check it:
> 1. before the loop (short circuit)
> 2. before grabbing the lock (again, short circuit)
> 3. after grabbing the lock (this is the critical one, to ensure we
> never try to delete a directory during rebalance)
> 
> (3) guarantees correctness because tasks hold their lock the entire
> time they are running. By making sure that the state is still RUNNING
> after the cleaner acquires the lock, we can be sure that all tasks
> have been assigned and their locks are already all held before we
> proceed to delete anything.
> 
> ----
> 
> In addition to that apparent race condition, if a task can get stuck
> indefinitely in "Created", that would also be a bug. AFAIK, the
> StreamThread should just keep attempting to transition all tasks to
> running. I.e., it should never leave a task behind. The alternative is
> that a task could raise a fatal exception and kill the StreamThread
> itself, which should then cause the whole KafkaStreams instance to
> shut down.
> 
> I think that Giridhar has identified the problematic block, and
> actually, I see that this seems fixed as of 2.1:
> 
> if (active.allTasksRunning()) {
>     final Set<TopicPartition> assignment = consumer.assignment();
>     log.trace("Resuming partitions {}", assignment);
>     consumer.resume(assignment);
>     assignStandbyPartitions();
>     // in 1.1, this says "return true"
>     return standby.allTasksRunning();
> }
> return false;
> 
> Is there any chance you could upgrade at least to 2.1 and see what you think?
> 
> ----
> 
> Regarding the comment about spamming the logs, this seems to be where
> we're handling the lock exception and just deferring until the next
> attempt at initializing the task. If we printed the log every time
> this happens, it would print at a very high rate during the rebalance,
> while the task is contending with the cleaner. But, since we _don't_
> actually try again in 1.1, I guess this comment is incorrect. However,
> it becomes true in 2.1, where we _do_ retry in StreamThread.
> 
> ----
> 
> As for why the retries are removed, it _looks_ like they are removed
> in favor of retrying at a higher level. This makes sense, as
> busy-waiting on the lock itself would block further progress at the
> top level, potentially causing Streams to fail its heartbeat, fall out
> of the group, and cause another round of rebalances.
> 
> Clearly, though, the "retry at the top level" part isn't actually
> functioning in your version of Streams.
> 
> ---
> 
> So, in conclusion, yes, there seems to be a bug that could leave a
> standby task stuck in Created in 1.1, which was fixed in 2.1.
> 
> Additionally, there's another concurrency bug that we should fix to
> really prevent the cleaner from deleting stuff during rebalances.
> Since you're the ones to identify the bug, it's generally better for
> you to open the bug ticket.
> 
> Finally, in terms of a workaround, I recommend you increase the state
> cleaner delay so that it won't compete as much with tasks during
> rebalance.
> 
> Thanks so much to you both for identifying this and looking so deeply into it!
> -John
> 
> On Thu, Nov 14, 2019 at 11:48 PM Giridhar Addepalli
> <gi...@gmail.com> wrote:
>>
>> Hi John,
>>
>> Can you please point us to code where Thread-2 will be able to recreate the state directory once cleaner is done ?
>>
>> Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around locks is removed. Please let us know why retry mechanism is removed?
>>
>> Also can you please explain below comment in AssignedTasks.java#initializeNewTasks function
>> catch (final LockException e) {
>>                     // made this trace as it will spam the logs in the poll loop.
>>                     log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
>>                 }
>> Thanks,
>> Giridhar.
>>
>> On 2019/11/14 21:25:28, John Roesler <jo...@confluent.io> wrote:
>>> Hey Navinder,
>>>
>>> I think what's happening is a little different. Let's see if my
>>> worldview also explains your experiences.
>>>
>>> There is no such thing as "mark for deletion". When a thread loses a
>>> task, it simply releases its lock on the directory. If no one else on
>>> the instance claims that lock within `state.cleanup.delay.ms` amount
>>> of milliseconds, then the state cleaner will itself grab the lock and
>>> delete the directory. On the other hand, if another thread (or the
>>> same thread) gets the task back and claims the lock before the
>>> cleaner, it will be able to re-open the store and use it.
>>>
>>> The default for `state.cleanup.delay.ms` is 10 minutes, which is
>>> actually short enough that it could pass during a single rebalance (if
>>> Streams starts recovering a lot of state). I recommend you increase
>>> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
>>>
>>> One thing I'm curious about... You didn't mention if Thread-2
>>> eventually is able to re-create the state directory (after the cleaner
>>> is done) and transition to RUNNING. This should be the case. If not, I
>>> would consider it a bug.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
>>> <na...@yahoo.com.invalid> wrote:
>>>>
>>>> Hi,
>>>> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
>>>> Can someone please comment on this behavior.
>>>> Thanks,Navinder
>>>


Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by John Roesler <jo...@confluent.io>.
Hi Navinder and Giridhar,

Thanks for the clarification. I think the motivation is reasonable.
Although, I'm still a little unsure how this condition occurs. In
addition to what I mentioned before, I'd forgotten one other important
detail, that the "cleaner thread" should only run when the
KafkaStreams state is RUNNING. That is, it shouldn't be able to
"steal" a state directory while a rebalance is ongoing. Perhaps what
is happening is that the cleaner checks the state before the
rebalance, then the rebalance starts while the cleaner is iterating
over the stores, so the task happens to unlock the directory just
before the cleaner locks it?

Entry point, in KafkaStreams:

stateDirCleaner.scheduleAtFixedRate(() -> {
    // we do not use lock here since we only read on the value and act on it
    if (state == State.RUNNING) {
        stateDirectory.cleanRemovedTasks(cleanupDelay);
    }
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);

Perhaps instead of checking the state of Streams _before_ the looping
over the tasks, we should check it:
1. before the loop (short circuit)
2. before grabbing the lock (again, short circuit)
3. after grabbing the lock (this is the critical one, to ensure we
never try to delete a directory during rebalance)

(3) guarantees correctness because tasks hold their lock the entire
time they are running. By making sure that the state is still RUNNING
after the cleaner acquires the lock, we can be sure that all tasks
have been assigned and their locks are already all held before we
proceed to delete anything.

----

In addition to that apparent race condition, if a task can get stuck
indefinitely in "Created", that would also be a bug. AFAIK, the
StreamThread should just keep attempting to transition all tasks to
running. I.e., it should never leave a task behind. The alternative is
that a task could raise a fatal exception and kill the StreamThread
itself, which should then cause the whole KafkaStreams instance to
shut down.

I think that Giridhar has identified the problematic block, and
actually, I see that this seems fixed as of 2.1:

if (active.allTasksRunning()) {
    final Set<TopicPartition> assignment = consumer.assignment();
    log.trace("Resuming partitions {}", assignment);
    consumer.resume(assignment);
    assignStandbyPartitions();
    // in 1.1, this says "return true"
    return standby.allTasksRunning();
}
return false;

Is there any chance you could upgrade at least to 2.1 and see what you think?

----

Regarding the comment about spamming the logs, this seems to be where
we're handling the lock exception and just deferring until the next
attempt at initializing the task. If we printed the log every time
this happens, it would print at a very high rate during the rebalance,
while the task is contending with the cleaner. But, since we _don't_
actually try again in 1.1, I guess this comment is incorrect. However,
it becomes true in 2.1, where we _do_ retry in StreamThread.

----

As for why the retries are removed, it _looks_ like they are removed
in favor of retrying at a higher level. This makes sense, as
busy-waiting on the lock itself would block further progress at the
top level, potentially causing Streams to fail its heartbeat, fall out
of the group, and cause another round of rebalances.

Clearly, though, the "retry at the top level" part isn't actually
functioning in your version of Streams.

---

So, in conclusion, yes, there seems to be a bug that could leave a
standby task stuck in Created in 1.1, which was fixed in 2.1.

Additionally, there's another concurrency bug that we should fix to
really prevent the cleaner from deleting stuff during rebalances.
Since you're the ones to identify the bug, it's generally better for
you to open the bug ticket.

Finally, in terms of a workaround, I recommend you increase the state
cleaner delay so that it won't compete as much with tasks during
rebalance.

Thanks so much to you both for identifying this and looking so deeply into it!
-John

On Thu, Nov 14, 2019 at 11:48 PM Giridhar Addepalli
<gi...@gmail.com> wrote:
>
> Hi John,
>
> Can you please point us to code where Thread-2 will be able to recreate the state directory once cleaner is done ?
>
> Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around locks is removed. Please let us know why retry mechanism is removed?
>
> Also can you please explain below comment in AssignedTasks.java#initializeNewTasks function
> catch (final LockException e) {
>                     // made this trace as it will spam the logs in the poll loop.
>                     log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
>                 }
> Thanks,
> Giridhar.
>
> On 2019/11/14 21:25:28, John Roesler <jo...@confluent.io> wrote:
> > Hey Navinder,
> >
> > I think what's happening is a little different. Let's see if my
> > worldview also explains your experiences.
> >
> > There is no such thing as "mark for deletion". When a thread loses a
> > task, it simply releases its lock on the directory. If no one else on
> > the instance claims that lock within `state.cleanup.delay.ms` amount
> > of milliseconds, then the state cleaner will itself grab the lock and
> > delete the directory. On the other hand, if another thread (or the
> > same thread) gets the task back and claims the lock before the
> > cleaner, it will be able to re-open the store and use it.
> >
> > The default for `state.cleanup.delay.ms` is 10 minutes, which is
> > actually short enough that it could pass during a single rebalance (if
> > Streams starts recovering a lot of state). I recommend you increase
> > `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> >
> > One thing I'm curious about... You didn't mention if Thread-2
> > eventually is able to re-create the state directory (after the cleaner
> > is done) and transition to RUNNING. This should be the case. If not, I
> > would consider it a bug.
> >
> > Thanks,
> > -John
> >
> > On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
> > <na...@yahoo.com.invalid> wrote:
> > >
> > > Hi,
> > > We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> > > Can someone please comment on this behavior.
> > > Thanks,Navinder
> >

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by Giridhar Addepalli <gi...@gmail.com>.
Hi John,

Can you please point us to code where Thread-2 will be able to recreate the state directory once cleaner is done ?

Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around locks is removed. Please let us know why retry mechanism is removed?

Also can you please explain below comment in AssignedTasks.java#initializeNewTasks function
catch (final LockException e) {
                    // made this trace as it will spam the logs in the poll loop.
                    log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
                }
Thanks,
Giridhar.

On 2019/11/14 21:25:28, John Roesler <jo...@confluent.io> wrote: 
> Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
> <na...@yahoo.com.invalid> wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder
> 

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by Giridhar Addepalli <gi...@gmail.com>.
Hi John,

i see in https://github.com/apache/kafka/pull/3653
there is discussion around swallowing of LockException and retry not being there.
but dguy replied saying that "The retry doesn't happen in this block of code. It will happen the next time the runLoop executes."

but state of thread is being changed to RUNNING, hence updateNewAndRestoringTasks won't be called again inside runOnce of StreamThread

In TaskManager#updateNewAndRestoringTasks at the end, there is IF condition which checks whether all active tasks are running.

Do you we should change 
from 
if (active.allTasksRunning()) { ... } 
to
if (active.allTasksRunning() && standby.allTasksRunning()) { ... }

Thanks,
Giridhar.

On 2019/11/15 03:09:17, Navinder Brar <na...@yahoo.com.INVALID> wrote: 
> Hi John,
> Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking of the store(by which in a way it is marked for deletion). From what I have seen the standby task gets stuck in Created state and doesn't move to Running and is not able to recreate the directory. Also, the point is not just that. With the new KIP to support serving from replicas we want to have very less downtime on replicas and in this case we already have a completely built state directory which is getting deleted just because of the assignment change on the thread(the host is still same). We have StreamsMetadataState#allMetadata() which would be common for all threads of all instances. Can't we have a conditional check during unlocking which checks allMetadata and finds out that the partition we are about to unlock is assigned to this host(we don't care which thread of this host) and then we don't unlock the task, meanwhile the Stream Thread-2 will take the lock on its own when it moves to Running.
> Thanks,Navinder
>     On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler <jo...@confluent.io> wrote:  
>  
>  Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
> <na...@yahoo.com.invalid> wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder  

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by Giridhar Addepalli <gi...@gmail.com>.
Hi John,

i see in https://github.com/apache/kafka/pull/3653
there is discussion around swallowing of LockException and retry not being there.
but dguy replied saying that "The retry doesn't happen in this block of code. It will happen the next time the runLoop executes."

but state of thread is being changed to RUNNING, hence updateNewAndRestoringTasks won't be called again inside runOnce of StreamThread

In TaskManager#updateNewAndRestoringTasks at the end, there is IF condition which checks whether all active tasks are running.

Do you we should change 
from 
if (active.allTasksRunning()) { ... } 
to
if (active.allTasksRunning() && standby.allTasksRunning()) { ... }

Thanks,
Giridhar.

On 2019/11/15 03:09:17, Navinder Brar <na...@yahoo.com.INVALID> wrote: 
> Hi John,
> Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking of the store(by which in a way it is marked for deletion). From what I have seen the standby task gets stuck in Created state and doesn't move to Running and is not able to recreate the directory. Also, the point is not just that. With the new KIP to support serving from replicas we want to have very less downtime on replicas and in this case we already have a completely built state directory which is getting deleted just because of the assignment change on the thread(the host is still same). We have StreamsMetadataState#allMetadata() which would be common for all threads of all instances. Can't we have a conditional check during unlocking which checks allMetadata and finds out that the partition we are about to unlock is assigned to this host(we don't care which thread of this host) and then we don't unlock the task, meanwhile the Stream Thread-2 will take the lock on its own when it moves to Running.
> Thanks,Navinder
>     On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler <jo...@confluent.io> wrote:  
>  
>  Hey Navinder,
> 
> I think what's happening is a little different. Let's see if my
> worldview also explains your experiences.
> 
> There is no such thing as "mark for deletion". When a thread loses a
> task, it simply releases its lock on the directory. If no one else on
> the instance claims that lock within `state.cleanup.delay.ms` amount
> of milliseconds, then the state cleaner will itself grab the lock and
> delete the directory. On the other hand, if another thread (or the
> same thread) gets the task back and claims the lock before the
> cleaner, it will be able to re-open the store and use it.
> 
> The default for `state.cleanup.delay.ms` is 10 minutes, which is
> actually short enough that it could pass during a single rebalance (if
> Streams starts recovering a lot of state). I recommend you increase
> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
> 
> One thing I'm curious about... You didn't mention if Thread-2
> eventually is able to re-create the state directory (after the cleaner
> is done) and transition to RUNNING. This should be the case. If not, I
> would consider it a bug.
> 
> Thanks,
> -John
> 
> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
> <na...@yahoo.com.invalid> wrote:
> >
> > Hi,
> > We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> > Can someone please comment on this behavior.
> > Thanks,Navinder  

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by Navinder Brar <na...@yahoo.com.INVALID>.
Hi John,
Thanks for the response. Yeah, by "marked for deletion" I meant the unlocking of the store(by which in a way it is marked for deletion). From what I have seen the standby task gets stuck in Created state and doesn't move to Running and is not able to recreate the directory. Also, the point is not just that. With the new KIP to support serving from replicas we want to have very less downtime on replicas and in this case we already have a completely built state directory which is getting deleted just because of the assignment change on the thread(the host is still same). We have StreamsMetadataState#allMetadata() which would be common for all threads of all instances. Can't we have a conditional check during unlocking which checks allMetadata and finds out that the partition we are about to unlock is assigned to this host(we don't care which thread of this host) and then we don't unlock the task, meanwhile the Stream Thread-2 will take the lock on its own when it moves to Running.
Thanks,Navinder
    On Friday, 15 November, 2019, 02:55:40 am IST, John Roesler <jo...@confluent.io> wrote:  
 
 Hey Navinder,

I think what's happening is a little different. Let's see if my
worldview also explains your experiences.

There is no such thing as "mark for deletion". When a thread loses a
task, it simply releases its lock on the directory. If no one else on
the instance claims that lock within `state.cleanup.delay.ms` amount
of milliseconds, then the state cleaner will itself grab the lock and
delete the directory. On the other hand, if another thread (or the
same thread) gets the task back and claims the lock before the
cleaner, it will be able to re-open the store and use it.

The default for `state.cleanup.delay.ms` is 10 minutes, which is
actually short enough that it could pass during a single rebalance (if
Streams starts recovering a lot of state). I recommend you increase
`state.cleanup.delay.ms` by a lot, like maybe set it to one hour.

One thing I'm curious about... You didn't mention if Thread-2
eventually is able to re-create the state directory (after the cleaner
is done) and transition to RUNNING. This should be the case. If not, I
would consider it a bug.

Thanks,
-John

On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
<na...@yahoo.com.invalid> wrote:
>
> Hi,
> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> Can someone please comment on this behavior.
> Thanks,Navinder  

Re: Potential Bug in 2.3 version (leading to deletion of state directories)

Posted by John Roesler <jo...@confluent.io>.
Hey Navinder,

I think what's happening is a little different. Let's see if my
worldview also explains your experiences.

There is no such thing as "mark for deletion". When a thread loses a
task, it simply releases its lock on the directory. If no one else on
the instance claims that lock within `state.cleanup.delay.ms` amount
of milliseconds, then the state cleaner will itself grab the lock and
delete the directory. On the other hand, if another thread (or the
same thread) gets the task back and claims the lock before the
cleaner, it will be able to re-open the store and use it.

The default for `state.cleanup.delay.ms` is 10 minutes, which is
actually short enough that it could pass during a single rebalance (if
Streams starts recovering a lot of state). I recommend you increase
`state.cleanup.delay.ms` by a lot, like maybe set it to one hour.

One thing I'm curious about... You didn't mention if Thread-2
eventually is able to re-create the state directory (after the cleaner
is done) and transition to RUNNING. This should be the case. If not, I
would consider it a bug.

Thanks,
-John

On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
<na...@yahoo.com.invalid> wrote:
>
> Hi,
> We are facing a peculiar situation in the 2.3 version of Kafka Streams. First of all, I want to clarify if it is possible that a Stream Thread (say Stream Thread-1) which had got an assignment for a standby task (say 0_0) can change to Stream Thread-2 on the same host post rebalancing. The issue we are facing is this is happening for us and post rebalancing since the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that task and marks it for deletion(after cleanup delay time), and meanwhile, the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries to transition this task to Running, it gets a LockException which is caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in Created state on Stream Thread-2 and after the cleanup delay is over the task directories for 0_0 get deleted.
> Can someone please comment on this behavior.
> Thanks,Navinder