You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/28 16:47:01 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

vvcephei commented on a change in pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#discussion_r416764158



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -76,6 +78,10 @@ public void setIfUnset(final Serializer<K> defaultSerializer) {
                 throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
             }
 
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+            }

Review comment:
       This (and below) is a bit awkward.
   
   Our requirement is not to call the supplier until after the app starts, but we can call it any time after the app starts.
   
   The natural place would be in `configure`, but unfortunately, that method is basically useless for our internal serdes. The reason is that we previously decided that `configure` should be called externally to the DSL, but our internal serdes are constructed _internal_ to the DSL. Plus, `configure` must be called at run time (when the config is available), but by run time, we can no longer tell whether our serde is "internal" or not. So, there's no good place where we can call `configure` for our internal serdes.
   
   I'm side-stepping the problem here by just invoking the supplier when we first need to use it, which is also at run time.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() {
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration available when the app runs,
+        // so we pass Suppliers into the components, which they can call at run time

Review comment:
       Hopefully, this explains what's going on here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1224,6 +1224,10 @@ private static Pattern buildPattern(final Collection<String> sourceTopics,
         return decoratedTopics;
     }
 
+    public String decoratePseudoTopic(final String topic) {

Review comment:
       I'm adding a new public method for our specific use case here, to document that we should _only_ need to invoke this method publicly for "pseudo" topics.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##########
@@ -218,19 +220,19 @@ public void shouldUseExpectedTopicsWithSerde() {
         }
         // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
         // topics our serdes serialize data for
-        assertThat(serdeScope.registeredTopics(), CoreMatchers.is(mkSet(
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
             // expected pseudo-topics
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
-            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",

Review comment:
       This verifies the fix: the pseudo topics should also be prefixed. I should have noticed before that they weren't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org