You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/08/01 09:14:00 UTC

[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

    [ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897900#comment-16897900 ] 

ASF GitHub Bot commented on KAFKA-8602:
---------------------------------------

cadonna commented on pull request #7145: KAFKA-8602: Backport bugfix for standby task creation
URL: https://github.com/apache/kafka/pull/7145
 
 
   Backports bugfix in standby task creation from PR #7008.
   A separate PR is needed because some tests in the original PR
   use topology optimizations and mocks that were introduced afterwards.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-8602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8602
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Critical
>             Fix For: 2.4.0
>
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This happens when a StreamThread gets assigned standby tasks for sub-topologies with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one standby replica each and the following topology. The input topic should have two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
>     Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
>         Serdes.Integer(),
>         Serdes.Integer())
>         .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
>     .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
>         private KeyValueStore<Integer, Integer> state;
>             @SuppressWarnings("unchecked")
>             @Override
>             public void init(final ProcessorContext context) {
>                 state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
>             }
>             @Override
>             public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
>                 final KeyValue<Integer, Integer> result = new KeyValue<>(key, value);
>                 return result;
>             }
>             @Override
>             public void close() {}
>         }, stateStoreName)
>         .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of the sub-topology have logging disabled.  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)