You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mostafa Asgari (JIRA)" <ji...@apache.org> on 2017/12/26 10:42:02 UTC
[jira] [Created] (KAFKA-6401) InvalidStateStoreException
immediately after starting streams
Mostafa Asgari created KAFKA-6401:
-------------------------------------
Summary: InvalidStateStoreException immediately after starting streams
Key: KAFKA-6401
URL: https://issues.apache.org/jira/browse/KAFKA-6401
Project: Kafka
Issue Type: Bug
Components: streams
Environment: ubuntu 14.04
Reporter: Mostafa Asgari
Priority: Minor
Attachments: Test.java
Hi
I wrote a simple kafka streams application. After I start the stream, if I call KafkaStreams.store immediately, I will get InvalidStateStoreException:
{code:java}
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-table, may have migrated to another instance.
{code}
Here is the complete code :
{code:java}
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Integer> table = builder.table(TOPIC_NAME , Consumed.with(Serdes.String(), Serdes.Integer(),
new FailOnInvalidTimestamp(), Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table"));
Topology topology = builder.build();
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000");
final KafkaStreams streams = new KafkaStreams( topology , props );
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
streams.close();
}
});
streams.start();
ReadOnlyKeyValueStore<String, Integer> store streams.store(table.queryableStoreName(), QueryableStoreTypes.keyValueStore());
{code}
However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not get the exception any more.
I wonder if it is a bug or I did something wrong.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)