You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2020/05/06 18:51:00 UTC
[jira] [Resolved] (KAFKA-6063) StreamsException is thrown after the
changing `partitions`
[ https://issues.apache.org/jira/browse/KAFKA-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sophie Blee-Goldman resolved KAFKA-6063.
----------------------------------------
Resolution: Not A Problem
> StreamsException is thrown after the changing `partitions`
> ----------------------------------------------------------
>
> Key: KAFKA-6063
> URL: https://issues.apache.org/jira/browse/KAFKA-6063
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.11.0.0
> Environment: macOS 10.12
> kafka 0.11.0.1
> Reporter: Akihito Nakano
> Priority: Trivial
> Labels: user-experience
>
> Hi.
> "org.apache.kafka.streams.errors.StreamsException" is thrown in following case.
> h3. Create topic
> {code:java}
> $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic word-count-input
> {code}
> h3. Create Kafka Streams Application
> {code:java}
> public class WordCountApp {
> public static void main(String[] args) {
> Properties config = new Properties();
> config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
> ...
> ...
> {code}
> h3. Ensure that it works fine
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
> StreamsThread appId: wordcount-application
> ...
> ...
> {code}
> h3. Change "partitions"
> {code:java}
> $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic word-count-input
> Adding partitions succeeded!
> {code}
> h3. When I start Application, StreamsException is thrown
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
> StreamsThread appId: wordcount-applicationn
> StreamsThread clientId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522
> StreamsThread threadId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1
> Active tasks:
> Running:
> Suspended:
> Restoring:
> New:
> Standby tasks:
> Running:
> Suspended:
> Restoring:
> New:
> Exception in thread "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
> at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
> at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
> at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> {code}
> If I change the application id, Application works again.
> Thank you.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)