You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mostafa Asgari (JIRA)" <ji...@apache.org> on 2017/12/26 10:42:02 UTC

[jira] [Created] (KAFKA-6401) InvalidStateStoreException immediately after starting streams

Mostafa Asgari created KAFKA-6401:
-------------------------------------

             Summary: InvalidStateStoreException immediately after starting streams
                 Key: KAFKA-6401
                 URL: https://issues.apache.org/jira/browse/KAFKA-6401
             Project: Kafka
          Issue Type: Bug
          Components: streams
         Environment: ubuntu 14.04
            Reporter: Mostafa Asgari
            Priority: Minor
         Attachments: Test.java

Hi
I wrote a simple kafka streams application. After I start the stream, if I call KafkaStreams.store immediately, I will get InvalidStateStoreException:

{code:java}
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-table, may have migrated to another instance.
{code}

Here is the complete code :

{code:java}
 final StreamsBuilder builder = new StreamsBuilder();

        final KTable<String, Integer> table = builder.table(TOPIC_NAME , Consumed.with(Serdes.String(), Serdes.Integer(),
                new FailOnInvalidTimestamp(), Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table"));

        Topology topology = builder.build();
        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000");
        final KafkaStreams streams = new KafkaStreams( topology , props );
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                streams.close();
            }
        });
        streams.start();
        ReadOnlyKeyValueStore<String, Integer> store streams.store(table.queryableStoreName(), QueryableStoreTypes.keyValueStore());

{code}
However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not get the exception any more.

I wonder if it is a bug or I did something wrong.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)