You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/12/26 18:33:00 UTC
[jira] [Resolved] (KAFKA-6401) InvalidStateStoreException
immediately after starting streams
[ https://issues.apache.org/jira/browse/KAFKA-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-6401.
------------------------------------
Resolution: Not A Problem
> InvalidStateStoreException immediately after starting streams
> -------------------------------------------------------------
>
> Key: KAFKA-6401
> URL: https://issues.apache.org/jira/browse/KAFKA-6401
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Environment: ubuntu 14.04
> Reporter: Mostafa Asgari
> Priority: Minor
> Attachments: Test.java
>
>
> Hi
> I wrote a simple kafka streams application. After I start the stream, if I call KafkaStreams.store immediately, I will get InvalidStateStoreException:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-table, may have migrated to another instance.
> {code}
> Here is the complete code :
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KTable<String, Integer> table = builder.table(TOPIC_NAME , Consumed.with(Serdes.String(), Serdes.Integer(),
> new FailOnInvalidTimestamp(), Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table"));
> Topology topology = builder.build();
> Properties props = new Properties();
> props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app");
> props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
> props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000");
> final KafkaStreams streams = new KafkaStreams( topology , props );
> Runtime.getRuntime().addShutdownHook(new Thread(){
> @Override
> public void run() {
> streams.close();
> }
> });
> streams.start();
> ReadOnlyKeyValueStore<String, Integer> store streams.store(table.queryableStoreName(), QueryableStoreTypes.keyValueStore());
> {code}
> However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not get the exception any more.
> I wonder if it is a bug or I did something wrong.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)