You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Victoria Xia (Jira)" <ji...@apache.org> on 2023/02/15 19:12:00 UTC

[jira] [Created] (KAFKA-14723) Do not write expired store records to changelog

Victoria Xia created KAFKA-14723:
------------------------------------

             Summary: Do not write expired store records to changelog
                 Key: KAFKA-14723
                 URL: https://issues.apache.org/jira/browse/KAFKA-14723
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Victoria Xia


Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because the whether records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record was actually put or not, or
 * move the logic for determining when a record is expired into an outer store layer, or
 * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)