You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "SeaAndHill (Jira)" <ji...@apache.org> on 2020/10/20 07:07:00 UTC

[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

    [ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217365#comment-17217365 ] 

SeaAndHill commented on KAFKA-8602:
-----------------------------------

[~cadonna] where can i down [1.0.3|https://issues.apache.org/jira/issues/?jql=project+%3D+KAFKA+AND+fixVersion+%3D+1.0.3] version ?

> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-8602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8602
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Critical
>             Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This happens when a StreamThread gets assigned standby tasks for sub-topologies with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one standby replica each and the following topology. The input topic should have two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
>     Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
>         Serdes.Integer(),
>         Serdes.Integer())
>         .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
>     .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
>         private KeyValueStore<Integer, Integer> state;
>             @SuppressWarnings("unchecked")
>             @Override
>             public void init(final ProcessorContext context) {
>                 state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
>             }
>             @Override
>             public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
>                 final KeyValue<Integer, Integer> result = new KeyValue<>(key, value);
>                 return result;
>             }
>             @Override
>             public void close() {}
>         }, stateStoreName)
>         .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of the sub-topology have logging disabled.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)