You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "James Cheng (JIRA)" <ji...@apache.org> on 2018/05/29 21:28:00 UTC

[jira] [Commented] (KAFKA-6967) TopologyTestDriver does not allow pre-populating state stores that have change logging

    [ https://issues.apache.org/jira/browse/KAFKA-6967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494299#comment-16494299 ] 

James Cheng commented on KAFKA-6967:
------------------------------------

If the very first call to the state store is a `store.put()`, it has no timestamp (or context), so the `put()` fails with the above stack trace.

If you instead run some data through topology first, and then afterwards manipulate the state store via `store.put()`, it has a context, and so the call works fine.

> TopologyTestDriver does not allow pre-populating state stores that have change logging
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6967
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6967
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>
> TopologyTestDriver does not allow pre-populating a state store that has logging enabled. If you try to do it, you will get the following error message:
>  
> {code:java}
> java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
> 	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
> 	at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
> 	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
> 	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
> 	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
> {code}
> Also see:
> https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java#L723-L740



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)