You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bruno Cadonna (JIRA)" <ji...@apache.org> on 2019/08/01 12:11:01 UTC
[jira] [Updated] (KAFKA-8602) StreamThread Dies Because Restore
Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna updated KAFKA-8602:
---------------------------------
Affects Version/s: 1.0.3
1.0.0
1.0.1
1.0.2
1.1.0
1.1.1
2.0.0
2.0.1
2.1.0
2.2.0
2.3.0
2.2.1
> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.0.3, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1
> Reporter: Bruno Cadonna
> Assignee: Bruno Cadonna
> Priority: Critical
> Fix For: 2.4.0
>
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
> at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This happens when a StreamThread gets assigned standby tasks for sub-topologies with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one standby replica each and the following topology. The input topic should have two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
> private KeyValueStore<Integer, Integer> state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
> final KeyValue<Integer, Integer> result = new KeyValue<>(key, value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of the sub-topology have logging disabled.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)