You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/28 16:26:58 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

vvcephei opened a new pull request #8574:
URL: https://github.com/apache/kafka/pull/8574


   * ensure that pseudo-topics get correctly prefixed with the app id at run time
   * update test to expect the app id prefix
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620971180


   Thanks, @arkins ! Shame is a powerful motivator :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#discussion_r416764158



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -76,6 +78,10 @@ public void setIfUnset(final Serializer<K> defaultSerializer) {
                 throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
             }
 
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }

Review comment:
       This (and below) is a bit awkward.
   
   Our requirement is not to call the supplier until after the app starts, but we can call it any time after the app starts.
   
   The natural place would be in `configure`, but unfortunately, that method is basically useless for our internal serdes. The reason is that we previously decided that `configure` should be called externally to the DSL, but our internal serdes are constructed _internal_ to the DSL. Plus, `configure` must be called at run time (when the config is available), but by run time, we can no longer tell whether our serde is "internal" or not. So, there's no good place where we can call `configure` for our internal serdes.
   
   I'm side-stepping the problem here by just invoking the supplier when we first need to use it, which is also at run time.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() {
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration available when the app runs,
+        // so we pass Suppliers into the components, which they can call at run time

Review comment:
       Hopefully, this explains what's going on here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1224,6 +1224,10 @@ private static Pattern buildPattern(final Collection<String> sourceTopics,
         return decoratedTopics;
     }
 
+    public String decoratePseudoTopic(final String topic) {

Review comment:
       I'm adding a new public method for our specific use case here, to document that we should _only_ need to invoke this method publicly for "pseudo" topics.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##########
@@ -218,19 +220,19 @@ public void shouldUseExpectedTopicsWithSerde() {
         }
         // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
         // topics our serdes serialize data for
-        assertThat(serdeScope.registeredTopics(), CoreMatchers.is(mkSet(
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
             // expected pseudo-topics
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",

Review comment:
       This verifies the fix: the pseudo topics should also be prefixed. I should have noticed before that they weren't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-621467955


   Thanks, all! I'll go ahead and merge this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-621496689


   cherry-picked to 2.5


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620787049


   cc @abbccdda @mjsax to take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#discussion_r417578557



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() {
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration available when the app runs,
+        // so we pass Suppliers into the components, which they can call at run time

Review comment:
       I understood




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-621501336


   cherry-picked to 2.4


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org