You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "SeaAndHill (Jira)" <ji...@apache.org> on 2020/10/20 07:07:00 UTC
[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore
Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217365#comment-17217365 ]
SeaAndHill commented on KAFKA-8602:
-----------------------------------
[~cadonna] where can i down [1.0.3|https://issues.apache.org/jira/issues/?jql=project+%3D+KAFKA+AND+fixVersion+%3D+1.0.3] version ?
> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1
> Reporter: Bruno Cadonna
> Assignee: Bruno Cadonna
> Priority: Critical
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
> at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This happens when a StreamThread gets assigned standby tasks for sub-topologies with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one standby replica each and the following topology. The input topic should have two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
> private KeyValueStore<Integer, Integer> state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
> final KeyValue<Integer, Integer> result = new KeyValue<>(key, value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of the sub-topology have logging disabled.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)