You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Victoria Xia (Jira)" <ji...@apache.org> on 2023/02/15 19:13:00 UTC

[jira] [Updated] (KAFKA-14723) Do not write expired store records to changelog

     [ https://issues.apache.org/jira/browse/KAFKA-14723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Victoria Xia updated KAFKA-14723:
---------------------------------
    Description: 
Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because whether the records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record was actually put or not, or
 * move the logic for determining when a record is expired into an outer store layer, or
 * reorder/restructure the wrapped store layers.

  was:
Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because the whether records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record was actually put or not, or
 * move the logic for determining when a record is expired into an outer store layer, or
 * reorder/restructure the wrapped store layers.


> Do not write expired store records to changelog
> -----------------------------------------------
>
>                 Key: KAFKA-14723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14723
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Victoria Xia
>            Priority: Major
>
> Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 
> The reason expired records are still written to the changelog topic is because whether the records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless.
> In order to avoid this, we could:
>  * update the put() interface to return a boolean indicating whether the record was actually put or not, or
>  * move the logic for determining when a record is expired into an outer store layer, or
>  * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)