You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "A. Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2021/03/18 23:00:02 UTC

[jira] [Commented] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

    [ https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304518#comment-17304518 ] 

A. Sophie Blee-Goldman commented on KAFKA-12503:
------------------------------------------------

Thanks for the writeup! If anyone reading this wants to check out the code for themselves, the methods and classes of note are (1) the ProcessorContext, which tracks a "current node" and forwards records to that note as they flow through the topology, (2) StreamTask#process where a StreamThread picks up the next record, sets the current node on the context to the SourceNode, processes the record through the subtopology, and then sets the current node back to null, and (3) TimestampedCacheFlushListener which is tied to a cache and the processor node immediately downstream of that cache. The listener receives records that have been evicted, saves the current node on the context as the `prevNode`, then sets the current node on the context, forwards the record, and resets the current node to `prevNode`.

The `StreamsException: Current node is unknown` must have occurred because the StreamThread owning this task had finished processing the record and set the current node to null when another thread hit the injected exception and tried to resize the cache. When the evicted record ended up in TimestampedCacheFlushListener#apply, the listener saved "null" as it's previous node, processed the record with its own node, and then reset it to null. If the other StreamThread had begun processing this task in the meantime, it would suddenly find its current node to be null and throw the exception we see.

The ClassCastException is similar, but instead of the listener setting the current node to null while the StreamTask was in the middle of processing, it must have set the current node to some other node elsewhere in the topology. If this other node has different input/output types, we would run into a ClassCastException as it forwards eg a Long to the downstream node which was expecting a Change<Long>.

Of course these are the race conditions under which we ran into visible symptoms, but as Walker mentioned the more dangerous possibility is all the other times when a foreign processor is inserted into the middle of the subtopology but the data types match and therefore no exception is thrown. In these cases we would be silently corrupting the data.

Obviously one followup question we should ask is why it was so easy for one thread to process records that belong to another in the first place? We should consider some kind of safety mechanism to ensure single-threaded access to the ProcessorContext and task directories -- turns out the locking mechanism doesn't actually protect against multithreaded access unless we explicitly ask it whether we can lock it or not.

> Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12503
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12503
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Walker Carlson
>            Priority: Blocker
>             Fix For: 2.8.0
>
>
> When a thread is added, removed or replaced the cache is resized. When the thread cache was resized it was being done so from the thread initiating these calls. This can cause the record to be redirected to the wrong processor via the call to `evict` in the cache. The evict flushes records downstream to the next processor after the cache. But if this is on the wrong thread the wrong processor receives them. 
> This can cause 3 problems.
> 1) When the owner finishes processing the record it set the current node to null in the processor context a this then causes the other processor to throw an exception `StreamsException: Current node is unknown.`. 
> 2) Depending on the type it can cause a class cast exception as the record is a different type. Mostly this happened when the value types were different inside of the map node from the toStream method
> 3) A silent issue is it could cause data to be processed by the wrong node and cause data corruption. We have not been able to confirm this last one but it is the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)