You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/25 00:35:34 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11609: KAFKA-12648: fixes for query APIs with named topologies

guozhangwang commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r791262134



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -635,6 +710,19 @@ public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
         }
     }
 
+    /**
+     * @return  true iff the metadata contents of each StreamsMetadata object are equal, ie all fields except
+     *          for the topologyName
+     */
+    private static boolean metadataIsEqual(final StreamsMetadata left, final StreamsMetadata right) {

Review comment:
       Should we change topology name from casting to Impl as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -113,7 +113,7 @@ public StreamsMetadata getLocalMetadata() {
             return allMetadata;
         }
 
-        final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName);
+        final Collection<String> sourceTopics = topologyMetadata.sourceTopicsForStore(storeName, null);

Review comment:
       Got it, thanks for the notes.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -319,6 +315,14 @@ private void verifyTopologyStateStore(final String topologyName, final String st
         return streamsMetadataState.getAllMetadataForStore(storeName, topologyName);
     }
 
+    /**
+     * See {@link KafkaStreams#metadataForAllStreamsClients()}
+     */
+    public Collection<StreamsMetadata> allStreamsClientsMetadataForTopology(final String topologyName) {

Review comment:
       Not needed for this PR, but I see there's an inconsistency in the function names: these two e.g. adds a `ForTopology` suffix to differentiate with the original ones in `KafkaStreams`, but others just overload the same function. When we expose them in the KIP we need to make it consistent.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -319,23 +329,88 @@ public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersiste
     }
 
     @Test
-    public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology() throws Exception {
-        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
-        streams.addNamedTopology(topology1Builder.build());
-        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15));
+    public void shouldAddNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() throws Exception {
+        try {
+            // for this test we have one of the topologies read from an input topic with just one partition so
+            // that there's only one instance of that topology's store and thus should always have exactly one
+            // StreamsMetadata returned by any of the methods that look up all hosts with a specific store and topology
+            CLUSTER.createTopic(SINGLE_PARTITION_INPUT_STREAM, 1, 1);
+            CLUSTER.createTopic(SINGLE_PARTITION_OUTPUT_STREAM, 1, 1);
+            produceToInputTopics(SINGLE_PARTITION_INPUT_STREAM, STANDARD_INPUT_DATA);
+
+            topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+            topology2Builder.stream(SINGLE_PARTITION_INPUT_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
+            streams.addNamedTopology(topology1Builder.build());
+            streams.addNamedTopology(topology2Builder.build());
+            IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15));
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), equalTo(COUNT_OUTPUT_DATA));
 
-        final ReadOnlyKeyValueStore<String, Long> store =
-            streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType(
-                "topology-1",
-                "store",
-                QueryableStoreTypes.keyValueStore())
-            );
-        assertThat(store.get("A"), equalTo(2L));
+            final ReadOnlyKeyValueStore<String, Long> store =
+                streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType(
+                    TOPOLOGY_1,
+                    "store",
+                    QueryableStoreTypes.keyValueStore())
+                );
+            assertThat(store.get("A"), equalTo(2L));
+
+            final Collection<StreamsMetadata> streamsMetadata = streams.streamsMetadataForStore("store", TOPOLOGY_1);
+            final Collection<StreamsMetadata> streamsMetadata2 = streams.streamsMetadataForStore("store", TOPOLOGY_2);
+            assertThat(streamsMetadata.size(), equalTo(1));
+            assertThat(streamsMetadata2.size(), equalTo(1));
+            assertThat(metadataIsEqual(streamsMetadata.iterator().next(), streamsMetadata2.iterator().next()), is(true));
+
+            final KeyQueryMetadata keyMetadata = streams.queryMetadataForKey("store", "A", new StringSerializer(), TOPOLOGY_1);
+            final KeyQueryMetadata keyMetadata2 = streams.queryMetadataForKey("store", "A", new StringSerializer(), TOPOLOGY_2);
+
+            assertThat(keyMetadata, not(NOT_AVAILABLE));
+            assertThat(keyMetadata, equalTo(keyMetadata2));
+
+            final Map<String, Map<Integer, LagInfo>> partitionLags1 = streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_1);
+            final Map<String, Map<Integer, LagInfo>> partitionLags2 = streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_2);
+
+            assertThat(partitionLags1.keySet(), equalTo(singleton("store")));
+            assertThat(partitionLags1.get("store").keySet(), equalTo(mkSet(0, 1)));
+            assertThat(partitionLags2.keySet(), equalTo(singleton("store")));
+            assertThat(partitionLags2.get("store").keySet(), equalTo(singleton(0))); // only one copy of the store in topology-2
+
+            // Start up a second node with both topologies
+            setupSecondKafkaStreams();
 
-        final Collection<StreamsMetadata> test1 = streams.streamsMetadataForStore("store");
-        final Collection<StreamsMetadata> streamsMetadata = streams.streamsMetadataForStore("store", "topology-1");
+            topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+            topology2Builder2.stream(SINGLE_PARTITION_INPUT_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
+
+            streams2.start(asList(topology1Builder2.build(), topology2Builder2.build()));
+            waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30));
+
+            final Collection<StreamsMetadata> streamsMetadataForStoreTopology1 = streams.streamsMetadataForStore("store", TOPOLOGY_1);
+            final Collection<StreamsMetadata> streamsMetadataForStoreTopology2 = streams.streamsMetadataForStore("store", TOPOLOGY_2);
+            final Collection<StreamsMetadata> streams2MetadataForStoreTopology1 = streams2.streamsMetadataForStore("store", TOPOLOGY_1);
+            final Collection<StreamsMetadata> streams2MetadataForStoreTopology2 = streams2.streamsMetadataForStore("store", TOPOLOGY_2);
+
+            // we don't know which host was actually assigned the task  containing the state
+            // tore in topology-2, only that there should be exactly one such host

Review comment:
       nit: `store`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -153,12 +166,19 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return "StreamsMetadata {" +
+        final String str =
+            "StreamsMetadata {" +
                 "hostInfo=" + hostInfo +
                 ", stateStoreNames=" + stateStoreNames +
                 ", topicPartitions=" + topicPartitions +
                 ", standbyStateStoreNames=" + standbyStateStoreNames +
                 ", standbyTopicPartitions=" + standbyTopicPartitions +
-                '}';
+            '}';
+        if (topologyName == null) {
+            return str;
+        } else {
+            return str +
+                "topologyName=" + topologyName;

Review comment:
       nit: add a space so it is "StreamsMetadata {...} topologyName=xyz"

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -319,23 +329,88 @@ public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersiste
     }
 
     @Test
-    public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology() throws Exception {
-        topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
-        streams.addNamedTopology(topology1Builder.build());
-        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15));
+    public void shouldAddNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() throws Exception {
+        try {
+            // for this test we have one of the topologies read from an input topic with just one partition so
+            // that there's only one instance of that topology's store and thus should always have exactly one
+            // StreamsMetadata returned by any of the methods that look up all hosts with a specific store and topology
+            CLUSTER.createTopic(SINGLE_PARTITION_INPUT_STREAM, 1, 1);

Review comment:
       Nice coverage with different num.partitions, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org