You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/04/19 00:55:38 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13609: KAFKA-14834: [11/N] Update table joins to identify out-of-order records with `isLatest`

mjsax commented on code in PR #13609:
URL: https://github.com/apache/kafka/pull/13609#discussion_r1170677125


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##########
@@ -97,6 +95,27 @@ public KTableKTableJoinMerger<K, VR> joinMerger() {
         return (KTableKTableJoinMerger<K, VR>) kChangeProcessorSupplier;
     }
 
+    @Override
+    public void enableVersionedSemantics(final boolean useVersionedSemantics, final String parentNodeName) {
+        enableVersionedSemantics(thisProcessorParameters(), useVersionedSemantics, parentNodeName);
+        enableVersionedSemantics(otherProcessorParameters(), useVersionedSemantics, parentNodeName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void enableVersionedSemantics(final ProcessorParameters<K, ?, ?, ?> processorParameters,
+                                          final boolean useVersionedSemantics,
+                                          final String parentNodeName) {
+        final ProcessorSupplier<K, ?, ?, ?> processorSupplier = processorParameters.processorSupplier();
+        if (!(processorSupplier instanceof KTableKTableAbstractJoin)) {
+            throw new IllegalStateException("Unexpected processor type for table-table join: " + processorSupplier.getClass().getName());
+        }
+        final KTableKTableAbstractJoin<K, ?, ?, ?> tableJoin = (KTableKTableAbstractJoin<K, ?, ?, ?>) processorSupplier;
+
+        if (parentNodeName.equals(tableJoin.joinThisParentNodeName())) {

Review Comment:
   Not sure if I understand this condition? Can you elaborate? It seems to be the only place when we sue the newly added `parentNodeName` -- why do we not use it elsewhere?



##########
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##########
@@ -215,14 +226,19 @@ public void testInnerWithVersionedStores() {
             null,
             null,
             null,
-            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
+            null,
+            null,
+            null,
+            null,
+            null,

Review Comment:
   It think there is one `null` line too many?



##########
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java:
##########
@@ -446,14 +482,18 @@ public void testInnerWithRightVersionedOnly() throws Exception {
                 null,
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L)),
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null,  14L)),
                 null,
                 null,
-                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null,  14L))
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null,  15L)),
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null,  14L)),
+                null,
+                null,
+                Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null,  14L))

Review Comment:
   Should the last two result row flip: we first get `F-e` when we process left hand `F` and get nothing when we process right hand `f`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -804,6 +806,7 @@ private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
         kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);
 
         builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
+        builder.addGraphNode(((KTableImpl<?, ?, ?>) other).graphNode, kTableKTableJoinNode);

Review Comment:
   Yeah. Seems to be incorrect, but did apparently not surface as a bug. Nice fix!



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1098,7 +1101,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
         //not be done needlessly.
         ((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);
 
-        //Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
+        //Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.

Review Comment:
   Thank you! -- Could we do a small follow up for 3.4 branch to get it backported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org