You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/04/12 22:56:31 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

vcrfxia opened a new pull request, #13552:
URL: https://github.com/apache/kafka/pull/13552

   This PR adds a method into GraphNode to assist in tracking whether tables are materialized as versioned or unversioned stores. This is needed in order to allow processors which have different behavior on versioned vs unversioned tables to use the correct semantics. For the full definition of when a table is considered "versioned" and which processors behave differently on versioned tables, see [KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores). 
   
   Unit tests will be added shortly, and integration tests will be in a follow-up PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1170924158


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1294,4 +1307,11 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
             builder
         );
     }
+
+    private static void maybeSetOutputVersioned(final GraphNode tableNode,

Review Comment:
   Hm, this class is method used only for table methods for which it's valid for `materializedInternal` to be null, e.g., table filter, mapValues, etc, where the result is not necessarily materialized. I don't think there's a way to rewrite this code such that `materializedInternal` will never be null in these cases, because non-null `materializedInternal` means that the result is materialized, when that's not necessarily the case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164980568


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   The old logic was incorrect -- the purpose of the method is to find instances of Class `clazz`, which is true if `currentNode.getClass()` can be assigned to `clazz`, and not the other way around.
   
   I noticed it was wrong because I was trying to use the method to get `TableFilterNode`s but the method was picking up `TableProcessorNode` instances and trying to assign them to `TableFilterNode`, which was throwing an exception because not all `TableProcessorNode`s are `TableFilterNode`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164891330


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java:
##########
@@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows
     private KTable<K, VOut> doAggregate(final Initializer<VOut> initializer,
                                         final NamedInternal named,
                                         final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+        final boolean isOutputVersioned = materializedInternal != null
+            && materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier;
         return aggregateBuilder.build(
             groupPatterns,
             initializer,
             named,
             new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
             materializedInternal.keySerde(),
             materializedInternal.valueSerde(),
-            materializedInternal.queryableStoreName());
+            materializedInternal.queryableStoreName(),
+            isOutputVersioned);

Review Comment:
   Think we can just pass in `materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164890769


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java:
##########
@@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows
     private KTable<K, VOut> doAggregate(final Initializer<VOut> initializer,
                                         final NamedInternal named,
                                         final MaterializedInternal<K, VOut, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+        final boolean isOutputVersioned = materializedInternal != null

Review Comment:
   `materializedInternal` cannot be `null` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1170929319


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   Previously this method was only used to find instances of `StreamStreamJoinNode`, which only has one concrete superclass, `BaseJoinProcessorNode`. All the test cases that called the method to find instances of `StreamStreamJoinNode` only involved one join node, and therefore never hit the issue I described above where the method was incorrectly finding superclass members. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164920870


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1281,6 +1293,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
             ),
             resultStore
         );
+        resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);

Review Comment:
   Should we make `setOutputVersioned` part of `TableProcessorNode` constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164913480


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##########
@@ -236,13 +237,16 @@ public SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows) {
     private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, V, K, T> aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+        final boolean isOutputVersioned = materializedInternal != null

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164893058


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -142,6 +145,7 @@ public <K, V> KTable<K, V> table(final String topic,
             .withMaterializedInternal(materialized)
             .withProcessorParameters(processorParameters)
             .build();
+        tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);

Review Comment:
   Should we make `setOutputVersioned` a `withOutputVersioned` builder method like all the others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1165991631


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   What this code unused before? Just wondering why the incorrect code was not an issue before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164922362


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1294,4 +1307,11 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
             builder
         );
     }
+
+    private static void maybeSetOutputVersioned(final GraphNode tableNode,

Review Comment:
   To avoid this helper, would it be worth to rewrite this class a little bit and ensure that `materializedInternal` is never `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1171939132


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##########
@@ -73,6 +74,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
                 stateCreated,
                 storeBuilder,
                 parentProcessor);
+            statefulProcessorNode.setOutputVersioned(isOutputVersioned);

Review Comment:
   Hm yeah I want to make something like this (and your other related suggestions) work so that this wiring is less brittle, but it's tricky since I think we only get value from making this change if we make `isOutputVersioned` a required constructor parameter of a bunch of different nodes: StatefulProcessorNode, KTableKTableJoinNode, TableSourceNode, StreamToTableNode, and TableProcessorNode (none of which inherit from each other). Do you think it's worth the change to make `isOutputVersioned` required for all of these? I do think the benefit of requiring future authors to think about `isOutputVersioned` in all of these cases is high, but it's going to bloat the codebase, so I'm on the fence as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13552:
URL: https://github.com/apache/kafka/pull/13552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164915423


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##########
@@ -73,6 +74,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
                 stateCreated,
                 storeBuilder,
                 parentProcessor);
+            statefulProcessorNode.setOutputVersioned(isOutputVersioned);

Review Comment:
   Wondering if we should pass `isOutputVersioned` via the constructor of `StatefulProcessorNode` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164930944


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   Why is this flipped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org