You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/14 15:05:38 UTC

[GitHub] [kafka] dima5rr opened a new pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

dima5rr opened a new pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   WrappingStoreProvider does not take into account withPartition parameter and always return all existing stores, thus causing significant performance degradation to the caller, in case state store has many partitions.
   
   https://issues.apache.org/jira/browse/KAFKA-10271


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455904752



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -20,62 +20,49 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
     private final StreamThread streamThread;
-    private final InternalTopologyBuilder internalTopologyBuilder;
 
-    public StreamThreadStateStoreProvider(final StreamThread streamThread,
-                                          final InternalTopologyBuilder internalTopologyBuilder) {
+    public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         this.streamThread = streamThread;
-        this.internalTopologyBuilder = internalTopologyBuilder;
     }
 
     @SuppressWarnings("unchecked")
     public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
         final String storeName = storeQueryParams.storeName();
         final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
-        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
         if (streamThread.state() == StreamThread.State.DEAD) {
             return Collections.emptyList();
         }
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
-            final List<T> stores = new ArrayList<>();
-            if (keyTaskId != null) {
-                final Task task = tasks.get(keyTaskId);
-                if (task == null) {
+            if (storeQueryParams.partition() != null) {
+                final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
+                if (streamTask == null) {
                     return Collections.emptyList();
                 }
-                final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
-                if (store != null) {
-                    return Collections.singletonList(store);
-                }
-            } else {
-                for (final Task streamTask : tasks.values()) {
-                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                    if (store != null) {
-                        stores.add(store);
-                    }
-                }
+                final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                return store != null ? Collections.singletonList(store) : Collections.emptyList();

Review comment:
       The nested early-return pattern is pretty hard to follow. Do you mind rewriting it to use if/else blocks? I know it was previously doing some early returns; it'd be better to migrate to a more maintainable style when we update the code, though.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Is this really a different condition than the one on L65? It seems like the failure is still probably that the store "migrated" instead of "doesn't exist", right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705707869


   Test passed, merged to trunk.
   
   Thanks @dima5rr for your great contribution!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658432973


   Hi @dima5rr , thanks for the PR! I was looking into the context of this ticket and noticed that we're essentially just duplicating here the logic in `org.apache.kafka.streams.state.internals.QueryableStoreProvider#getStore`, which also loops over all stores of all providers as a sanity check before even returning the WrappingStoreProvider, which will loop over all stores of all providers _again_ before returning any results.
   
   Anecdotally, I think that most people would actually just query immediately and then discard their store reference, for example like `value = kafkaStreams.store("storeName", partition=2).get("key")`. In that case, this double-iteration is pretty expensive, and the up-front sanity check doesn't seem to provide any value.
   
   In fact, the only value it could provide is if you _do_ plan to save the store reference and re-use it for multiple queries. But in that case, there could be a rebalance at any time, so checking up front probably doesn't help much no matter what the use case is.
   
   What do you think about removing the loop https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L59-L77 in favor of the one in here?
   
   Of course, the short-circuit you're providing here is valuable in any case.
   
   Thanks!
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r498055567



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Hey @guozhangwang, you're right, this check is ambiguous, it's more likely parameter sanity validation when user explicitly specify a single partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703380689


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460681


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703387252


   @dima5rr I tried to compile your branch but got a few compilation error like the following:
   
   ```
   /Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java:68: error: cannot find symbol
           final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer()));
                                                                                                ^
     symbol:   variable Serdes
     location: class StreamStreamJoinIntegrationTest
   /Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java:68: error: cannot find symbol
           final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer()));
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455948704



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
+            final List<T> stores = new ArrayList<>();
             if (storeQueryParams.partition() != null) {
                 final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
-                if (streamTask == null) {
-                    return Collections.emptyList();
+                if (streamTask != null) {
+                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                    if (store != null) {
+                        stores.add(store);
+                    }
                 }
-                final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                return store != null ? Collections.singletonList(store) : Collections.emptyList();
+            } else {
+                tasks.values().stream().
+                        map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+                        filter(Objects::nonNull).
+                        forEach(stores::add);
             }
-            return tasks.values().stream().
-                    map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
-                    filter(Objects::nonNull).
-                    collect(Collectors.toList());
+            return Collections.unmodifiableList(stores);

Review comment:
       Ah, sorry, I can see that my prior comment was ambiguous. This is what I meant:
   ```java
               if (storeQueryParams.partition() == null) {
                   return tasks.values().stream().
                           map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
                           filter(Objects::nonNull).
                           collect(Collectors.toList());
               } else {
                   final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
                   if (streamTask == null) {
                       return Collections.emptyList();
                   } else {
                       final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
                       return store == null ? Collections.emptyList() : Collections.singletonList(store);
                   }
               }
   ```
   
   The reason this is better for maintenence is that you only have to trace a path through the nested conditionals into a single inner block to understand what gets returned. I.e., code comprehension complexity is only the depth of the conditional tree.
   
   In contrast, if we do early returns, you have to fully read all the conditional blocks that lead up to the one you're interested (depth-first traversal), so code comprehension is linear instead of logarithmic. If we mutate the collection, you actually have to read _all_ the conditionals to understand what is going to happen, so code comprehension is also linear instead of logarithmic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr edited a comment on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr edited a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658831006


   Indeed StreamThreadStateStoreProvider does not need InternalTopologyBuilder in order to find stream task, it has all required data in StreamThread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r498627951



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Hey @guozhangwang, I am just care that in case of partition is null, the error message is referenced in official FAQ.
   
   https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r494747319



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
         rightStream = builder.stream(INPUT_TOPIC_RIGHT);
     }
 
+    @Test
+    public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {

Review comment:
       A good coverage improvement! Thanks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -104,19 +95,11 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread,
         }
     }
 
-    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
-        if (partition == null) {
-            return null;
-        }
-        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-        final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
-            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-                return new TaskId(topicGroup.getKey(), partition);
-            }
-        }
-        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
-            partition + " is not available on this instance");
+    private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {

Review comment:
       This is a great find, thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -56,25 +55,6 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
-        final List<T> allStores = new ArrayList<>();

Review comment:
       LGTM.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Could you elaborate a bit more about this? If `allStores.isEmpty()` is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-659004636


   In addition, all synchronized methods in StreamsMetadataState.getKeyQueryMetadataForKey are also have performance implications under load.
   
   https://github.com/apache/kafka/blob/598a0d16fa779cba9bb9ad0354236f269b2ff13d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L213


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang closed pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-660038989


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703043609


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703043650


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-659004636


   In addition, all synchronized methods in StreamsMetadataState.getKeyQueryMetadataForKey are also have performance implications under load.
   
   https://github.com/apache/kafka/blob/598a0d16fa779cba9bb9ad0354236f269b2ff13d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L213


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704462953


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r498533922



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Got it, in that case how about we just encode the partition in the thrown's message so that upon throwing, people can still check if the partition is `null` or not when debugging?
   
   Otherwise, this PR all LGTM :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r498055567



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Hey @guozhangwang, you're right, this check is ambiguous, it's more likely sanity validation for parameter for user when explicitly specify a single partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-660038989


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704460234


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r496796429



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Hey @dima5rr , I think Guozhang's question was hidden because the conversation was already "resolved". Do you mind answering this concern?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705707869






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455974476



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
+            final List<T> stores = new ArrayList<>();
             if (storeQueryParams.partition() != null) {
                 final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
-                if (streamTask == null) {
-                    return Collections.emptyList();
+                if (streamTask != null) {
+                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                    if (store != null) {
+                        stores.add(store);
+                    }
                 }
-                final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                return store != null ? Collections.singletonList(store) : Collections.emptyList();
+            } else {
+                tasks.values().stream().
+                        map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+                        filter(Objects::nonNull).
+                        forEach(stores::add);
             }
-            return tasks.values().stream().
-                    map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
-                    filter(Objects::nonNull).
-                    collect(Collectors.toList());
+            return Collections.unmodifiableList(stores);

Review comment:
       Will concise it into functional way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703020271


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r499101848



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       That's a fair point, let's just merge it as is then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703838454


   Hi @guozhangwang can you trigger new build, looks like flaky tests?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705723421


   Cherry-picked to 2.6 as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r494747319



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
         rightStream = builder.stream(INPUT_TOPIC_RIGHT);
     }
 
+    @Test
+    public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {

Review comment:
       A good coverage improvement! Thanks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -104,19 +95,11 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread,
         }
     }
 
-    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
-        if (partition == null) {
-            return null;
-        }
-        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-        final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
-            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-                return new TaskId(topicGroup.getKey(), partition);
-            }
-        }
-        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
-            partition + " is not available on this instance");
+    private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {

Review comment:
       This is a great find, thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -56,25 +55,6 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
-        final List<T> allStores = new ArrayList<>();

Review comment:
       LGTM.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       Could you elaborate a bit more about this? If `allStores.isEmpty()` is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658859876


   > Hi @dima5rr , thanks for the PR! I was looking into the context of this ticket and noticed that we're essentially just duplicating here the logic in `org.apache.kafka.streams.state.internals.QueryableStoreProvider#getStore`, which also loops over all stores of all providers as a sanity check before even returning the WrappingStoreProvider, which will loop over all stores of all providers _again_ before returning any results.
   > 
   > Anecdotally, I think that most people would actually just query immediately and then discard their store reference, for example like `value = kafkaStreams.store("storeName", partition=2).get("key")`. In that case, this double-iteration is pretty expensive, and the up-front sanity check doesn't seem to provide any value.
   > 
   > In fact, the only value it could provide is if you _do_ plan to save the store reference and re-use it for multiple queries. But in that case, there could be a rebalance at any time, so checking up front probably doesn't help much no matter what the use case is.
   > 
   > What do you think about removing the loop
   > 
   > https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L59-L77
   > 
   > in favor of the one in here?
   > Of course, the short-circuit you're providing here is valuable in any case.
   > 
   > Thanks!
   > -John
   
   Agree, it can be removed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang closed pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658432973


   Hi @dima5rr , thanks for the PR! I was looking into the context of this ticket and noticed that we're essentially just duplicating here the logic in `org.apache.kafka.streams.state.internals.QueryableStoreProvider#getStore`, which also loops over all stores of all providers as a sanity check before even returning the WrappingStoreProvider, which will loop over all stores of all providers _again_ before returning any results.
   
   Anecdotally, I think that most people would actually just query immediately and then discard their store reference, for example like `value = kafkaStreams.store("storeName", partition=2).get("key")`. In that case, this double-iteration is pretty expensive, and the up-front sanity check doesn't seem to provide any value.
   
   What do you think about removing the loop https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L59-L77 in favor of the one in here?
   
   Of course, the short-circuit you're providing here is valuable in any case.
   
   Thanks!
   -John


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455939491



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",

Review comment:
       L65 catches on rebalancing, while L60 is parameter validation for incorrect partition case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658831006


   Indeed StreamThreadStateStoreProvider does need InternalTopologyBuilder in order to find stream task, it has all required data in StreamThread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658859876


   > Hi @dima5rr , thanks for the PR! I was looking into the context of this ticket and noticed that we're essentially just duplicating here the logic in `org.apache.kafka.streams.state.internals.QueryableStoreProvider#getStore`, which also loops over all stores of all providers as a sanity check before even returning the WrappingStoreProvider, which will loop over all stores of all providers _again_ before returning any results.
   > 
   > Anecdotally, I think that most people would actually just query immediately and then discard their store reference, for example like `value = kafkaStreams.store("storeName", partition=2).get("key")`. In that case, this double-iteration is pretty expensive, and the up-front sanity check doesn't seem to provide any value.
   > 
   > In fact, the only value it could provide is if you _do_ plan to save the store reference and re-use it for multiple queries. But in that case, there could be a rebalance at any time, so checking up front probably doesn't help much no matter what the use case is.
   > 
   > What do you think about removing the loop
   > 
   > https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L59-L77
   > 
   > in favor of the one in here?
   > Of course, the short-circuit you're providing here is valuable in any case.
   > 
   > Thanks!
   > -John
   
   Agree, it can be removed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658604900


   Hi @vvcephei, thank you for the input.
   
   After profiling under load it looks like problem in excessive loops over calling internalTopologyBuilder.topicGroups()
   which is synchronized.
   Though short-circuit WrappingStoreProvider can help, but will not solve root cause.
   
   WDYT?
   
   https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java#L113
   
   https://github.com/apache/kafka/blob/f699bd98c14e31f07c5a3f6ba9ce2c4b441e7fdb/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java#L1030
   
   ![9020](https://user-images.githubusercontent.com/18258318/87517410-79870d80-c687-11ea-8ba1-746af4c932fe.png)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455934580



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -20,62 +20,49 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
     private final StreamThread streamThread;
-    private final InternalTopologyBuilder internalTopologyBuilder;
 
-    public StreamThreadStateStoreProvider(final StreamThread streamThread,
-                                          final InternalTopologyBuilder internalTopologyBuilder) {
+    public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         this.streamThread = streamThread;
-        this.internalTopologyBuilder = internalTopologyBuilder;
     }
 
     @SuppressWarnings("unchecked")
     public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
         final String storeName = storeQueryParams.storeName();
         final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
-        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
         if (streamThread.state() == StreamThread.State.DEAD) {
             return Collections.emptyList();
         }
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
-            final List<T> stores = new ArrayList<>();
-            if (keyTaskId != null) {
-                final Task task = tasks.get(keyTaskId);
-                if (task == null) {
+            if (storeQueryParams.partition() != null) {
+                final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
+                if (streamTask == null) {
                     return Collections.emptyList();
                 }
-                final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
-                if (store != null) {
-                    return Collections.singletonList(store);
-                }
-            } else {
-                for (final Task streamTask : tasks.values()) {
-                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                    if (store != null) {
-                        stores.add(store);
-                    }
-                }
+                final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                return store != null ? Collections.singletonList(store) : Collections.emptyList();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-704474872


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-658439040


   Alternatively, as you observed, if the parameters contain a single partition, then there should just be one specific store that matches. Instead of returning a `new WrappingStoreProvider` at all, maybe the QueryableStoreProvider should just have a high-level branch that, if the partition is speficied, then find the store and return a `new SingleStoreProvider`, otherwise return a `new WrappingStoreProvider`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org