You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/17 20:52:55 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #11611: MINOR: prefix topics if internal config is set

wcarlson5 opened a new pull request #11611:
URL: https://github.com/apache/kafka/pull/11611


   In order to move a topology to another runtime without having to copy over the internal topics it would be good to have the option to not prefix the internal topics with the application ID. So this change will introduce a new config that if set will be the internal topic prefix
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r772623797



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -951,6 +951,8 @@
         public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"
             + ".vector.enabled__";
 
+        // Private API used to control the usage of consistency offset vectors

Review comment:
       This comment seems irrelevant? It's not for the offset vectors right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       Could we use `ProcessorContextUtils#changelogFor` here as well? Ditto below




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-999794655


   Though this comment is not for AK, I'd have to raise to our attention: in ksql we have hard-coded rules to detect internal topics to be cleaned when terminating a query, which is like this today:
   
   ```
     private static boolean isInternalTopic(final String topicName, final String applicationId) {
       final boolean prefixMatches = topicName.startsWith(applicationId + "-");
       final boolean suffixMatches = topicName.endsWith(KsqlConstants.STREAMS_CHANGELOG_TOPIC_SUFFIX)
           || topicName.endsWith(KsqlConstants.STREAMS_REPARTITION_TOPIC_SUFFIX)
           || topicName.matches(KsqlConstants.STREAMS_JOIN_REGISTRATION_TOPIC_PATTERN)
           || topicName.matches(KsqlConstants.STREAMS_JOIN_RESPONSE_TOPIC_PATTERN);
       return prefixMatches && suffixMatches;
     }
   ```
   
   With this change we have to remember updating this function otherwise the cleanup logic would break.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1007795951


   LGTM! Re-triggering the jenkins tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1007796312


   @wcarlson5 could you double check if the failed tests are relevant? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1007578613


   @guozhangwang yes that is a good point. When we go to use this we will have to set the config and we can updated the clean up logic then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11611:
URL: https://github.com/apache/kafka/pull/11611


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1007795951






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r781511282



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-13588




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-997025039


   @ableegoldman This should make it so we can have topics prefixed with whatever is needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1007578613


   @guozhangwang yes that is a good point. When we go to use this we will have to set the config and we can updated the clean up logic then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r780402975



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       It could either be a `StateStoreContext` of a `ProcessorContext` and there are different methods for each. I am not sure why as they seem to do similar things. but I didn't want to make reaching changes here. I can just file a ticket to consolidate those methods into one maybe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r780568663



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       In the future we would remove the deprecated `init(final ProcessorContext context, final StateStore root)` and then we only need `ProcessorContextUtils#changelogFor(StateStoreContext..)`.
   
   Sounds good for filing a JIRA for consolidating to the `changelogFor`, we can do that when we remove the deprecated `init` function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r780402975



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       It could either be a `StateStoreContext` of a `ProcessorContext` and there are different methods for each. I am not sure why as they seem to do similar things. but I didn't want to make reaching changes here. I can just file a ticket to consolidate those methods into one maybe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#discussion_r780568663



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
##########
@@ -85,8 +86,13 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(

Review comment:
       In the future we would remove the deprecated `init(final ProcessorContext context, final StateStore root)` and then we only need `ProcessorContextUtils#changelogFor(StateStoreContext..)`.
   
   Sounds good for filing a JIRA for consolidating to the `changelogFor`, we can do that when we remove the deprecated `init` function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11611: MINOR: prefix topics if internal config is set

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11611:
URL: https://github.com/apache/kafka/pull/11611#issuecomment-1009368253


   @guozhangwang Yeah I needed update a few tests. It should be good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org