You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "A. Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2022/12/07 04:14:00 UTC

[jira] [Commented] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException

    [ https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644129#comment-17644129 ] 

A. Sophie Blee-Goldman commented on KAFKA-14260:
------------------------------------------------

Ok I did merge a patch to fix where we forgot to synchronize, which is certainly a bug leading to potential CME, but I realize that's not what this ticket was about so I want to explain why I resolved it: ie that synchronization is sufficient for avoiding CMEs. I do think you pointed out something of note here, though, which is worth following up on though perhaps tracking separately.

In the IMKVIterator constructor from [~guozhang]'s snippet above, it's true we get an iterator based on the original map, but it's still just a copy of that map: so this iterator doesn't pin any part of the original map and just happily returns the set of keys that were in the original map when the range API was invoked. There's no way to modify the contents of this copy as it's internal to the (also internal) iterator, and even if you delete a record with a given key in that store, the actual key object itself still exists (and can/will still be returned by that iterator)

So I really don't see how a CME is possible if we properly synchronize the APIs to enforce single-threaded access while that copy is being made. Which we do (now, since merging [~Cerchie] 's PR)

That said, it still feels a bit awkward because the keyset-copy iterator can return keys that no longer exist in the actual store. In this case when we issue a get on that key it'll return null, and the range read will have an entry with a null value. Technically Streams makes no guarantees about whether a range scan will reflect only the original state store contents or only the latest contents or anything in between, and I'm not sure there's even a "right" answer there.

Still, returning a KeyValue("key1", null) is still pretty awkward and likely unexpected by most users, so I _can_ this resulting in an NPE. Fortunately that's a much easier fix, as we can just toss out that result and return whatever is next. I think it's worth filing a separate ticket for that one, though

[~aviperksy] thoughts? Did I miss something obvious here? Also note that the code has changed a lot over the years, so it's possible what you described does affect some older branch(es)

> InMemoryKeyValueStore iterator still throws ConcurrentModificationException
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-14260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14260
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 3.2.3
>            Reporter: Avi Cherry
>            Assignee: Lucia Cerchie
>            Priority: Major
>             Fix For: 3.4.0
>
>
> This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802.
> Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a ConcurrentModificationException because the backing map is not concurrent safe. I expect that this only happens when the store is retrieved from {{KafkaStreams.store()}} from outside of the topology since any usage of the store from inside of the topology should be naturally single-threaded.
> To start off, a reminder that this behaviour explicitly violates the interface contract for {{ReadOnlyKeyValueStore}} which states
> {quote}The returned iterator must be safe from java.util.ConcurrentModificationExceptions
> {quote}
> It is often complicated to make code to demonstrate concurrency bugs, but thankfully it is trivial to reason through the source code in {{InMemoryKeyValueStore.java}} to show why this happens:
>  * All of the InMemoryKeyValueStore methods that return iterators do so by passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator constructor.
>  * These keySets are all VIEWS of the backing map, not copies.
>  * The InMemoryKeyValueIterator then makes a private copy of the keySet by passing the original keySet into the constructor for TreeSet. This copying was implemented in KAFKA-8802, incorrectly intending it to fix the concurrency problem.
>  * TreeSet then iterates over the keySet to make a copy. If the original backing TreeMap in InMemoryKeyValueStore is changed while this copy is being created it will fail-fast a ConcurrentModificationException.
> This bug should be able to be trivially fixed by replacing the backing TreeMap with a ConcurrentSkipListMap but here's the rub:
> This bug has already been found in KAFKA-7912 and the TreeMap was replaced with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in KAFKA-8802 because of the performance regression. I can [see from one of the PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed] that it was believed the concurrency problem with the TreeMap implementation was fixed by copying the keyset when the iterator is created but the problem remains, plus the fix creates an extra copy of the iterated portion of the set in memory.
> For what it's worth, the performance difference between TreeMap and ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar ~2x speed through all operations with any size of data, but at the cost of what turned out to be an easy-to-encounter bug.
> This is all unfortunate since the only time the state stores ever get accessed concurrently is through the `KafkaStreams.store()` mechanism, but I would imagine that "correct and slightly slower) is better than "incorrect and faster".
> Too bad BoilerBay's AirConcurrentMap is closed-source and patented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)