You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/10/21 15:19:58 UTC

[jira] [Commented] (KAFKA-4311) Multi layer cache eviction causes forwarding to incorrect ProcessorNode

    [ https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595401#comment-15595401 ] 

ASF GitHub Bot commented on KAFKA-4311:
---------------------------------------

GitHub user dguy opened a pull request:

    https://github.com/apache/kafka/pull/2051

    KAFKA-4311: Multi layer cache eviction causes forwarding to incorrect ProcessorNode

    Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that a ClassCastException is thrown. This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
    We need to set the correct `ProcessorNode`, on the context, in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to set remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.
    
    ```
           final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
            final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
            final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
                @Override
                public KeyValue<String, Long> apply(final Long key, final String value) {
                    return new KeyValue<>(value, key);
                }
            }, Serdes.String(), Serdes.Long())
                    .reduce(new Reducer<Long>() {..}, new Reducer<Long>() {..}, "reducer-store");
                    
        one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {..})
            .mapValues(new ValueMapper<String, String>() {..});
                       
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dguy/kafka kafka-4311

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2051
    
----
commit 5c8896a9d2aaca6bf5b9fb8de4de8140919fb280
Author: Damian Guy <da...@gmail.com>
Date:   2016-10-21T11:27:17Z

    Save and set the current processor node in FlushingCacheListener.

----


> Multi layer cache eviction causes forwarding to incorrect ProcessorNode 
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-4311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4311
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Damian Guy
>            Assignee: Damian Guy
>             Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list. After investigation, the root cause is multiple cache evictions happening in the same topology. 
> Given a topology like the one below. If a record arriving in `tableOne` causes a cache eviction, it will trigger the `leftJoin` that will do a `get` from `reducer-store`. If the key is not currently cached in `reducer-store`, but is in the backing store, it will be put into the cache, and it may also trigger an eviction. If it does trigger an eviction and the eldest entry is dirty it will flush the dirty keys. It is at this point that the exception in the comment happens (ClassCastException). This occurs because the ProcessorContext is still set to the context of the `leftJoin` and the next child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context,  in the `ForwardingCacheFlushListener` prior to calling `context.forward`. We also need to set remember to reset the `ProcessorNode` to the previous node once `context.forward` has completed.
> {code}
>         final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
>         final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
>         final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
>             @Override
>             public KeyValue<String, Long> apply(final Long key, final String value) {
>                 return new KeyValue<>(value, key);
>             }
>         }, Serdes.String(), Serdes.Long())
>                 .reduce(new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 + value2;
>                     }
>                 }, new Reducer<Long>() {
>                     @Override
>                     public Long apply(final Long value1, final Long value2) {
>                         return value1 - value2;
>                     }
>                 }, "reducer-store");
>     one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
>             @Override
>             public String apply(final String value1, final Long value2) {
>                 return value1 + ":" + value2;
>             }
>         })
>         .mapValues(new ValueMapper<String, String>() {
>                     @Override
>                     public String apply(final String value) {
>                         return value;
>                     }
>                 });
> {code}
> This exception is actually a symptom of the exception reported below in the comment. After the first exception is thrown, the StreamThread triggers a shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)