You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/22 23:19:17 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

vcrfxia opened a new pull request, #13292:
URL: https://github.com/apache/kafka/pull/13292

   (This PR is stacked on https://github.com/apache/kafka/pull/13274. Only the last commit needs to be reviewed separately.)
   
   This PR sets the correct topic configs for changelog topics for versioned stores introduced in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores). Changelog topics for versioned stores differ from those for non-versioned stores only in that `min.compaction.lag.ms` needs to be set in order to prevent version history from being compacted prematurely. 
   
   The value for `min.compaction.lag.ms` is equal to the store's history retention plus some buffer to account for the broker's use of wall-clock time in performing compactions. This buffer is analogous to the `windowstore.changelog.additional.retention.ms` value for window store changelog topic retention time, and uses the same default of 24 hours. In the future, we can propose a KIP to expose a config such as `versionedstore.changelog.additional.compaction.lag.ms` to allow users to tune this value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1142842326


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   >  It's not strictly necessary though.
   
   If not necessary, no need to do anything. Just wanted to probe if we need to do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127273827


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Why are we setting this "outside" but not via the constructor? (I see that we do the same thing for WindowedChangelogTopicConfig but I am not sure why either.)
   
   No reason, AFAICT -- I was just following precedent. Would you prefer passing `factory.historyRetention()` into the constructor and setting it there? I can refactor `WindowedChangelogTopicConfig` as well if so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1136249663


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Yes, I think it would be cleaner this way.
   
   OK, made the updates.
   
   > `delete.retetion.ms` is not for retention based topics, but it's for compacted topic
   
   Ah I see. My scala's not the best but it looks like `min.compaction.lag.ms` guarantees that any record within `min.compaction.lag.ms` of "now" will not be compacted, regardless of `delete.retention.ms`, which is the important point that we need to guarantee that older record versions are not prematurely compacted.
   
   Interestingly, we might have a case for setting `delete.retention.ms = 0` (rather than using the default of 24 hours) since we know that we no longer need the older tombstones once `min.compaction.lag.ms` is expired. It's not strictly necessary though. Do you think we should set it for completeness? I'm fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13292:
URL: https://github.com/apache/kafka/pull/13292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1116012004


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java:
##########
@@ -25,7 +25,7 @@
 
 /**
  * UnwindowedChangelogTopicConfig captures the properties required for configuring
- * the un-windowed store changelog topics.
+ * the un-windowed, un-versioned store changelog topics.

Review Comment:
   This class name `UnwindowedChangelogTopicConfig` is a bit outdated now and should really be `UnwindowedUnversionedChangelogTopicConfig`. A bit clunky but I can't think of anything better. Will rename for now and can update if others have suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1132987512


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Would you prefer passing factory.historyRetention() into the constructor and setting it there? I can refactor WindowedChangelogTopicConfig as well if so.
   
   Yes, I think it would be cleaner this way.
   
   > Nope, the changelog topic for versioned stores is only compacted, so delete.retention.ms is unused. (In contrast to the windowed changelog topic case, where both compaction and retention-based deletion are enabled)
   
   `delete.retetion.ms` is not for retention based topics, but it's for compacted topic -- it defines how long tombstones are retained if a tombstone is the last message for a key -- ie, a tombstone marks a "soft delete" and is preserved for a while, but not directly removed when compaction runs. The goal is to ensure that a potentially offline consumer does not miss a tombstone, what would lead to a "data leak" as it might never cleanup a record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1116012004


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java:
##########
@@ -25,7 +25,7 @@
 
 /**
  * UnwindowedChangelogTopicConfig captures the properties required for configuring
- * the un-windowed store changelog topics.
+ * the un-windowed, un-versioned store changelog topics.

Review Comment:
   This class name `UnwindowedChangelogTopicConfig` is a bit outdated now and should really be `UnwindowedUnversionedChangelogTopicConfig`. A bit clunky but I can't think of anything better. Curious if reviewer's think it's worth the rename.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1116012004


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/UnwindowedChangelogTopicConfig.java:
##########
@@ -25,7 +25,7 @@
 
 /**
  * UnwindowedChangelogTopicConfig captures the properties required for configuring
- * the un-windowed store changelog topics.
+ * the un-windowed, un-versioned store changelog topics.

Review Comment:
   This class name `UnwindowedChangelogTopicConfig` is a bit outdated now and should really be `UnwindowedUnversionedChangelogTopicConfig`. A bit clunky but I can't think of anything better. Curious if reviewers think it's worth the rename.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13292:
URL: https://github.com/apache/kafka/pull/13292#issuecomment-1468887946

   Thanks @mjsax for your review! I made the refactor you suggested (including to the existing WindowedChangelogTopicConfig) and also pushed updates to InternalTopicManager. InternalTopicManager is currently unused code but I figured it'd be good to update it for the new type of internal topic anyway for completeness. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127274296


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Do we also need to increase `delete.retention.ms` for this case?
   
   Nope, the changelog topic for verisoned stores is only compacted, so `delete.retention.ms` is unused. (In contrast to the windowed changelog topic case, where both compaction and retention-based deletion are enabled)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Do we also need to increase `delete.retention.ms` for this case?
   
   Nope, the changelog topic for versioned stores is only compacted, so `delete.retention.ms` is unused. (In contrast to the windowed changelog topic case, where both compaction and retention-based deletion are enabled)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1127245620


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   Do we also need to increase `delete.retention.ms` for this case? Not sure how both config "interact".



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   Why are we setting this "outside" but not via the constructor? (I see that we do the same thing for `WindowedChangelogTopicConfig` but I am not sure why either.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org