You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/25 05:01:30 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r494747319



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
         rightStream = builder.stream(INPUT_TOPIC_RIGHT);
     }
 
+    @Test
+    public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {

Review comment:
       A good coverage improvement! Thanks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -104,19 +95,11 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread,
         }
     }
 
-    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
-        if (partition == null) {
-            return null;
-        }
-        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-        final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
-            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-                return new TaskId(topicGroup.getKey(), partition);
-            }
-        }
-        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
-            partition + " is not available on this instance");
+    private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {

Review comment:
       This is a great find, thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -56,25 +55,6 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
-        final List<T> allStores = new ArrayList<>();

Review comment:
       LGTM.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Could you elaborate a bit more about this? If `allStores.isEmpty()` is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org