You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2020/07/30 19:09:00 UTC
[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys
format incompatibility (lack of sequenceNumber in keys on topic)
[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168155#comment-17168155 ]
Sophie Blee-Goldman commented on KAFKA-10322:
---------------------------------------------
Hey [~tbradlo]
Thanks for the bug report. I agree, this is obviously incorrect. What I'm not yet understanding is why we don't run into this problem all the time -- the WindowKeySchema methods are used pretty heavily by RocksDBWindowStore as well, and from a glance they all seem to assume that the serialized bytes include a sequence number.
I'll try to set up a test to figure out the true extent of this problem. Would you be interested in submitting a PR with the fix?
> InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
> -------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10322
> URL: https://issues.apache.org/jira/browse/KAFKA-10322
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.5.0
> Environment: windows/linux
> Reporter: Tomasz Bradło
> Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>
> fun addStream(kStreamBuilder: StreamsBuilder) {
> val storeSupplier = Stores.inMemoryWindowStore("count-store",
> Duration.ofDays(10),
> Duration.ofDays(1),
> false)
> val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = Stores
> .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long())
> kStreamBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofDays(1)))
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long()))
> .toStream()
> .to("topic1-count")
> val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long())
> kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier)
> }{code}
> While sending to "topic1-count", for serializing the key [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] is used which is using [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>
> Everything works. I can get correct values from state-store. But, in recovery scenario, when [GlobalStateManagerImpl |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads from "topic1-count" and fails to extract valid key and timestamp using [WindowKeySchema.extractStoreKeyBytes |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and [WindowKeySchema.extractStoreTimestamp. |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)