You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/22 12:33:11 UTC

[GitHub] [kafka] sayantanu-dey opened a new pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

sayantanu-dey opened a new pull request #11703:
URL: https://github.com/apache/kafka/pull/11703


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sayantanu-dey commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
sayantanu-dey commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r790776516



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -171,42 +169,24 @@ private void registerMetrics() {
     @Deprecated
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        final String prefix = getPrefix(context.appConfigs(), context.applicationId());
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
       Hi, after going through the code I realized that the classes like CachingWindowStore and InMemoryTimeOrderedKeyValueBuffer always need to create a new topic whereas the classes MeteredXYZStore have a null checking code flow.
   As there are two types of flow. So, I thought to have a bool variable explicitly to have both flows in the same function. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sayantanu-dey commented on pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
sayantanu-dey commented on pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#issuecomment-1020797877


   > One meta comment regarding consolidating the functions: we have `getPrefix` in multiple classes (and thanks @sayantanu-dey your PRs remove some already), but there are still two duplicated functions in `InternalTopologyBuilder` and `ProcessorContextUtils` as well, could we just keep one and have the other reference it too?
   
   Hi, @guozhangwang  I have removed the duplicate getPrefix function in `InternalTopologyBuilder` in my latest commit. now there exits only one function `getPrefix` in `ProcessorContextUtils`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sayantanu-dey commented on pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
sayantanu-dey commented on pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#issuecomment-1020797877


   > One meta comment regarding consolidating the functions: we have `getPrefix` in multiple classes (and thanks @sayantanu-dey your PRs remove some already), but there are still two duplicated functions in `InternalTopologyBuilder` and `ProcessorContextUtils` as well, could we just keep one and have the other reference it too?
   
   Hi, @guozhangwang  I have removed the duplicate getPrefix function in `InternalTopologyBuilder` in my latest commit. now there exits only one function `getPrefix` in `ProcessorContextUtils`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r790371892



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -56,18 +56,26 @@ public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
-    public static String changelogFor(final ProcessorContext context, final String storeName) {
+    public static String changelogFor(final ProcessorContext context, final String storeName, final Boolean newChangelogTopic) {
         final String prefix = getPrefix(context.appConfigs(), context.applicationId());
-        return context instanceof InternalProcessorContext
-            ? ((InternalProcessorContext) context).changelogFor(storeName)
-            : ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
+        if (context instanceof InternalProcessorContext && !newChangelogTopic) {
+            final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName);
+            if (changelogTopic != null)
+                return changelogTopic;
+
+        }
+        return ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
     }
 
-    public static String changelogFor(final StateStoreContext context, final String storeName) {
+    public static String changelogFor(final StateStoreContext context, final String storeName, final Boolean newChangelogTopic) {
         final String prefix = getPrefix(context.appConfigs(), context.applicationId());
-        return context instanceof InternalProcessorContext
-            ? ((InternalProcessorContext) context).changelogFor(storeName)
-            : ProcessorStateManager.storeChangelogTopic(prefix, storeName, context.taskId().topologyName());
+        if (context instanceof InternalProcessorContext && !newChangelogTopic) {
+            final String changelogTopic = ((InternalProcessorContext) context).changelogFor(storeName);

Review comment:
       I checked the code in `InternalTopologyBuilder` and I think that at the time `ProcessorStateManager` is constructed all store->changelog pairs should be inside the `Map<String, String> storeToChangelogTopic` already, and assuming that for all mock/tests, we let the mock of InternalProcessorContext to return non-null strings we should never have `null` returned from `changelogFor`. @cadonna could you help me double check here, since in the original PR your intention (https://github.com/apache/kafka/pull/8902/files#r452148874) is to still need null-check.
   
   If that's the case, we can further simply the logic here as:
   
   1) we would never need to fallback to `ProcessorStateManager.storeChangelogTopic` since the changelog string should be constructed via the function and added into the map in `InternalTopologyBuilder` as always.
   2) hence we do not need the third param since we would never need to create new changelog. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -171,42 +169,24 @@ private void registerMetrics() {
     @Deprecated
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        final String prefix = getPrefix(context.appConfigs(), context.applicationId());
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
       Only three MeteredXYZStore classes' `initStoreSerde` have the third param to false, but it seems that if the returned value is `null` we would still fallback to `ProcessorStateManager.storeChangelogTopic(prefix, storeName, taskId.topologyName())` to create the new topic, so it should be true as well??
   
   BTW if my other comment is valid, then we can remove this param anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] sayantanu-dey commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
sayantanu-dey commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r790421944



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -171,42 +169,24 @@ private void registerMetrics() {
     @Deprecated
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        final String prefix = getPrefix(context.appConfigs(), context.applicationId());
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
       What I understood going through the code is that in MeteredXYZStore classes first, we check if the returned value is null then we create a new topic, and in classes, like WindowCacheStore and InMemoryTimeOrderedKeyValueBuffer we always need to create a new topic.
   Hence I introduced the third param which when false goes through the whole procedure of checking null and creating a new topic, and in case the param is set to True it just creates a new topic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11703: KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11703:
URL: https://github.com/apache/kafka/pull/11703#discussion_r799128273



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -171,42 +169,24 @@ private void registerMetrics() {
     @Deprecated
     private void initStoreSerde(final ProcessorContext context) {
         final String storeName = name();
-        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName);
-        final String prefix = getPrefix(context.appConfigs(), context.applicationId());
+        final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

Review comment:
       Yes, but I'm wondering if the two types of flow are really necessary. It seems to me that the topic should have always been created at the `init` phase already, and hence it seems the null check is not necessary.
   
   @cadonna Would need your quick thoughts about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org