You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by saiprasad mishra <sa...@gmail.com> on 2016/10/25 21:43:19 UTC

issue with custom processor flush to rocksdb store

Hi
This is with version 10.1.0 kafka streams (server running in remote and
streams app running local in my laptop).



I have a kafka stream pipeline like this

source topic(with 10 partitions) stream -> filter for null value ->map to
make it keyed by id ->custom processor to mystore(persistent)

I am getting the below exception. This happens when the flush happens.
If I restart the app the data i sent is actually present in rocksdb store.
I see the message of the keyed stream went to partition 0 on which flush
happened correctly i guess as I see below partition 9 task failed to flush
not sure about the complain about timestamp() here.

Can somebody explain what does this mean.


Not sure if it has something to do with below timestamp extractor property
i am setting or any other time like producer create time ???

        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
ConsumerRecordTimestampExtractor.class);


Regards
Sai


2016-10-25 14:31:29.822000
org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:


org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
to flush state store Products

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
[kafka-streams-0.10.1.0.jar!/:?]

Caused by: java.lang.IllegalStateException: This should not happen as
timestamp() should only be called while a record is processed

at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.timestamp(ProcessorContextImpl.java:192)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:112)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:375)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:175)
~[kafka-streams-0.10.1.0.jar!/:?]

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
~[kafka-streams-0.10.1.0.jar!/:?]

... 6 more

Re: issue with custom processor flush to rocksdb store

Posted by saiprasad mishra <sa...@gmail.com>.
Yes this is similar meaning it was all about KafkaStreams not started
correctly in my spring app and NOT a bug in KafkaStreams.
Inside the comments in the JIRA I have mentioned what I was doing wrong.

These type of exceptions largely indicate kafka streams was not started
correctly

Thanks for your valuable time on this
Regards
Sai

On Wed, Oct 26, 2016 at 2:34 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344
> ?
>
> On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <
> saiprasadmishra@gmail.com
> > wrote:
>
> > Hi
> > This is with version 10.1.0 kafka streams (server running in remote and
> > streams app running local in my laptop).
> >
> >
> >
> > I have a kafka stream pipeline like this
> >
> > source topic(with 10 partitions) stream -> filter for null value ->map to
> > make it keyed by id ->custom processor to mystore(persistent)
> >
> > I am getting the below exception. This happens when the flush happens.
> > If I restart the app the data i sent is actually present in rocksdb
> store.
> > I see the message of the keyed stream went to partition 0 on which flush
> > happened correctly i guess as I see below partition 9 task failed to
> flush
> > not sure about the complain about timestamp() here.
> >
> > Can somebody explain what does this mean.
> >
> >
> > Not sure if it has something to do with below timestamp extractor
> property
> > i am setting or any other time like producer create time ???
> >
> >         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > ConsumerRecordTimestampExtractor.class);
> >
> >
> > Regards
> > Sai
> >
> >
> > 2016-10-25 14:31:29.822000
> > org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> > ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9
> state:
> >
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_9]
> Failed
> > to flush state store Products
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:331)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:275)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:576)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:562)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:538)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:456)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.0.jar!/:?]
> >
> > Caused by: java.lang.IllegalStateException: This should not happen as
> > timestamp() should only be called while a record is processed
> >
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> > timestamp(ProcessorContextImpl.java:192)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:112)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flush(RocksDBStore.java:375)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> > MeteredKeyValueStore.java:175)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> > ProcessorStateManager.java:329)
> > ~[kafka-streams-0.10.1.0.jar!/:?]
> >
> > ... 6 more
> >
>
>
>
> --
> -- Guozhang
>

Re: issue with custom processor flush to rocksdb store

Posted by Guozhang Wang <wa...@gmail.com>.
Is it a similar report as https://issues.apache.org/jira/browse/KAFKA-4344?

On Tue, Oct 25, 2016 at 2:43 PM, saiprasad mishra <saiprasadmishra@gmail.com
> wrote:

> Hi
> This is with version 10.1.0 kafka streams (server running in remote and
> streams app running local in my laptop).
>
>
>
> I have a kafka stream pipeline like this
>
> source topic(with 10 partitions) stream -> filter for null value ->map to
> make it keyed by id ->custom processor to mystore(persistent)
>
> I am getting the below exception. This happens when the flush happens.
> If I restart the app the data i sent is actually present in rocksdb store.
> I see the message of the keyed stream went to partition 0 on which flush
> happened correctly i guess as I see below partition 9 task failed to flush
> not sure about the complain about timestamp() here.
>
> Can somebody explain what does this mean.
>
>
> Not sure if it has something to do with below timestamp extractor property
> i am setting or any other time like producer create time ???
>
>         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> ConsumerRecordTimestampExtractor.class);
>
>
> Regards
> Sai
>
>
> 2016-10-25 14:31:29.822000
> org.apache.kafka.streams.processor.internals.StreamThread StreamThread-1
> ERROR stream-thread [StreamThread-1] Failed to commit StreamTask 0_9 state:
>
>
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_9] Failed
> to flush state store Products
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:331)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:275)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:576)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:562)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:538)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:456)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> [kafka-streams-0.10.1.0.jar!/:?]
>
> Caused by: java.lang.IllegalStateException: This should not happen as
> timestamp() should only be called while a record is processed
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.
> timestamp(ProcessorContextImpl.java:192)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:112)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:375)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:175)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(
> ProcessorStateManager.java:329)
> ~[kafka-streams-0.10.1.0.jar!/:?]
>
> ... 6 more
>



-- 
-- Guozhang