You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/04/13 04:24:41 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164980568


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##########
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
         assertEquals(count.get(), 2);
     }
 
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableFilter() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, materializedInternal);
+        table1.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> unversionedMaterialize =
+            new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, Long, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, Long> table2 = table1.groupBy(KeyValue::new).count(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithIntermediateForeignKeyJoin() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    // not recommended to materialize join result as versioned since semantics are not correct,
+    // but this test is included anyway for completeness
+    @Test
+    public void shouldSetUseVersionedSemanticsWithIntermediateForeignKeyJoinMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize3 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned3", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = builder.table("t2", consumed, versionedMaterialize2);
+        final KTable<String, String> table3 = table1.join(table2, v -> v, (v1, v2) -> v1 + v2, versionedMaterialize3);
+        table3.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldNotSetUseVersionedSemanticsWithToStreamAndBack() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable();
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, false);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsWithToStreamAndBackIfMaterializedAsVersioned() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize2 =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.toStream().toTable(versionedMaterialize2);
+        table2.filter((k, v) -> v != null);
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>());
+        assertNotNull(filter);
+        verifyVersionedSemantics((TableFilterNode<?, ?>) filter, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMap() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        table1.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    @Test
+    public void shouldSetUseVersionedSemanticsOnTableRepartitionMapWithIntermediateNodes() {
+        // Given:
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> versionedMaterialize =
+            new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix);
+        final KTable<String, String> table1 = builder.table("t1", consumed, versionedMaterialize);
+        final KTable<String, String> table2 = table1.filter((k, v) -> v != null).mapValues(v -> v + v);
+        table2.groupBy(KeyValue::new).count();
+
+        // When:
+        builder.buildAndOptimizeTopology();
+
+        // Then:
+        final GraphNode repartitionMap = getNodeByType(builder.root, TableRepartitionMapNode.class, new HashSet<>());
+        assertNotNull(repartitionMap);
+        verifyVersionedSemantics((TableRepartitionMapNode<?, ?>) repartitionMap, true);
+    }
+
+    private void verifyVersionedSemantics(final TableFilterNode<?, ?> filterNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = filterNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableFilter);
+        final KTableFilter<?, ?> tableFilter = (KTableFilter<?, ?>) processorSupplier;
+        assertEquals(expectedValue, tableFilter.isUseVersionedSemantics());
+    }
+
+    private void verifyVersionedSemantics(final TableRepartitionMapNode<?, ?> repartitionMapNode, final boolean expectedValue) {
+        final ProcessorSupplier<?, ?, ?, ?> processorSupplier = repartitionMapNode.processorParameters().processorSupplier();
+        assertTrue(processorSupplier instanceof KTableRepartitionMap);
+        final KTableRepartitionMap<?, ?, ?, ?> repartitionMap = (KTableRepartitionMap<?, ?, ?, ?>) processorSupplier;
+        assertEquals(expectedValue, repartitionMap.isUseVersionedSemantics());
+    }
+
     private GraphNode getNodeByType(
         final GraphNode currentNode,
         final Class<? extends GraphNode> clazz,
         final Set<GraphNode> visited) {
 
-        if (currentNode.getClass().isAssignableFrom(clazz)) {
+        if (clazz.isAssignableFrom(currentNode.getClass())) {

Review Comment:
   The old logic was incorrect -- the purpose of the method is to find instances of Class `clazz`, which is true if `currentNode.getClass()` can be assigned to `clazz`, and not the other way around.
   
   I noticed it was wrong because I was trying to use the method to get `TableFilterNode`s but the method was picking up `TableProcessorNode` instances and trying to assign them to `TableFilterNode`, which was throwing an exception because not all `TableProcessorNode`s are `TableFilterNode`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org