You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/16 16:59:31 UTC

[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455934580



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##########
@@ -20,62 +20,49 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
     private final StreamThread streamThread;
-    private final InternalTopologyBuilder internalTopologyBuilder;
 
-    public StreamThreadStateStoreProvider(final StreamThread streamThread,
-                                          final InternalTopologyBuilder internalTopologyBuilder) {
+    public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         this.streamThread = streamThread;
-        this.internalTopologyBuilder = internalTopologyBuilder;
     }
 
     @SuppressWarnings("unchecked")
     public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
         final String storeName = storeQueryParams.storeName();
         final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
-        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
         if (streamThread.state() == StreamThread.State.DEAD) {
             return Collections.emptyList();
         }
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
             final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
-            final List<T> stores = new ArrayList<>();
-            if (keyTaskId != null) {
-                final Task task = tasks.get(keyTaskId);
-                if (task == null) {
+            if (storeQueryParams.partition() != null) {
+                final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition());
+                if (streamTask == null) {
                     return Collections.emptyList();
                 }
-                final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
-                if (store != null) {
-                    return Collections.singletonList(store);
-                }
-            } else {
-                for (final Task streamTask : tasks.values()) {
-                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                    if (store != null) {
-                        stores.add(store);
-                    }
-                }
+                final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
+                return store != null ? Collections.singletonList(store) : Collections.emptyList();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org