You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/08 21:38:10 UTC

[kafka] branch 2.7 updated: KAFKA-10271: Performance regression while fetching a key from a single partition (#9020)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new ccff722  KAFKA-10271: Performance regression while fetching a key from a single partition (#9020)
ccff722 is described below

commit ccff7227ce11747c758c0903b34ee1d84119b9f9
Author: Dima Reznik <di...@fiverr.com>
AuthorDate: Thu Oct 8 20:12:33 2020 +0300

    KAFKA-10271: Performance regression while fetching a key from a single partition (#9020)
    
    StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.
    
    Reviewers: John Roesler <vv...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  2 +-
 .../state/internals/QueryableStoreProvider.java    | 20 -------
 .../internals/StreamThreadStateStoreProvider.java  | 67 ++++++++--------------
 .../state/internals/WrappingStoreProvider.java     | 17 +++++-
 .../integration/EosBetaUpgradeIntegrationTest.java |  1 +
 .../integration/JoinStoreIntegrationTest.java      |  2 +-
 .../internals/QueryableStoreProviderTest.java      | 12 ++--
 .../StreamThreadStateStoreProviderTest.java        |  2 +-
 .../state/internals/WrappingStoreProviderTest.java | 44 +++++++++-----
 .../apache/kafka/test/StateStoreProviderStub.java  |  2 +-
 10 files changed, 79 insertions(+), 90 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f78023a..83b84dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -782,7 +782,7 @@ public class KafkaStreams implements AutoCloseable {
                 delegatingStateRestoreListener,
                 i + 1);
             threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
+            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 2af5874..8dd1f03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.StoreQueryParameters;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
@@ -56,25 +55,6 @@ public class QueryableStoreProvider {
         if (!globalStore.isEmpty()) {
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
-        final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            final List<T> stores = storeProvider.stores(storeQueryParameters);
-            if (!stores.isEmpty()) {
-                allStores.addAll(stores);
-                if (storeQueryParameters.partition() != null) {
-                    break;
-                }
-            }
-        }
-        if (allStores.isEmpty()) {
-            if (storeQueryParameters.partition() != null) {
-                throw new InvalidStateStoreException(
-                        String.format("The specified partition %d for store %s does not exist.",
-                                storeQueryParameters.partition(),
-                                storeName));
-            }
-            throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
-        }
         return queryableStoreType.create(
             new WrappingStoreProvider(storeProviders, storeQueryParameters),
             storeName
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 7cc263a..d5a175d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -20,7 +20,6 @@ import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
     private final StreamThread streamThread;
-    private final InternalTopologyBuilder internalTopologyBuilder;
 
-    public StreamThreadStateStoreProvider(final StreamThread streamThread,
-                                          final InternalTopologyBuilder internalTopologyBuilder) {
+    public StreamThreadStateStoreProvider(final StreamThread streamThread) {
         this.streamThread = streamThread;
-        this.internalTopologyBuilder = internalTopologyBuilder;
     }
 
     @SuppressWarnings("unchecked")
     public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
         final String storeName = storeQueryParams.storeName();
         final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
-        final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
         if (streamThread.state() == StreamThread.State.DEAD) {
             return Collections.emptyList();
         }
         final StreamThread.State state = streamThread.state();
         if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
-            final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
-            final List<T> stores = new ArrayList<>();
-            if (keyTaskId != null) {
-                final Task task = tasks.get(keyTaskId);
-                if (task == null) {
-                    return Collections.emptyList();
-                }
-                final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
-                if (store != null) {
-                    return Collections.singletonList(store);
-                }
+            final Collection<Task> tasks = storeQueryParams.staleStoresEnabled() ?
+                    streamThread.allTasks().values() : streamThread.activeTasks();
+
+            if (storeQueryParams.partition() != null) {
+                return findStreamTask(tasks, storeName, storeQueryParams.partition()).
+                        map(streamTask ->
+                                validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+                        map(Collections::singletonList).
+                        orElse(Collections.emptyList());
             } else {
-                for (final Task streamTask : tasks.values()) {
-                    final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
-                    if (store != null) {
-                        stores.add(store);
-                    }
-                }
+                return tasks.stream().
+                        map(streamTask ->
+                                validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+                        filter(Objects::nonNull).
+                        collect(Collectors.toList());
             }
-            return stores;
         } else {
             throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
                                                     state + ", not RUNNING" +
@@ -104,19 +95,11 @@ public class StreamThreadStateStoreProvider {
         }
     }
 
-    private TaskId createKeyTaskId(final String storeName, final Integer partition) {
-        if (partition == null) {
-            return null;
-        }
-        final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
-        final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
-        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
-        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
-            if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
-                return new TaskId(topicGroup.getKey(), partition);
-            }
-        }
-        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
-            partition + " is not available on this instance");
+    private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
+        return tasks.stream().
+                filter(streamTask -> streamTask.id().partition == partition &&
+                        streamTask.getStore(storeName) != null &&
+                        storeName.equals(streamTask.getStore(storeName).name())).
+                findFirst();
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 5c9ae1a..26c5db0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -46,11 +46,22 @@ public class WrappingStoreProvider implements StateStoreProvider {
     public <T> List<T> stores(final String storeName,
                               final QueryableStoreType<T> queryableStoreType) {
         final List<T> allStores = new ArrayList<>();
-        for (final StreamThreadStateStoreProvider provider : storeProviders) {
-            final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+        for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",
+                                storeQueryParameters.partition(),
+                                storeName));
+            }
             throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
         }
         return allStores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index cd57acb..d039d98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -1080,6 +1080,7 @@ public class EosBetaUpgradeIntegrationTest {
             streams,
             StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())
         );
+        waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> "State store did not ready: " + storeName);
         final Set<Long> keys = new HashSet<>();
         try (final KeyValueIterator<Long, Long> it = store.all()) {
             while (it.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c519117..e15788a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -108,7 +108,7 @@ public class JoinStoreIntegrationTest {
 
             kafkaStreams.start();
             latch.await();
-            assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())));
+            assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 2d04755..f2ca0c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -60,12 +60,12 @@ public class QueryableStoreProviderTest {
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionIfKVStoreDoesntExist() {
-        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1");
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionIfWindowStoreDoesntExist() {
-        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
     }
 
     @Test
@@ -80,12 +80,12 @@ public class QueryableStoreProviderTest {
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() {
-        storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1");
     }
 
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() {
-        storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore()));
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
     }
 
     @Test
@@ -106,7 +106,7 @@ public class QueryableStoreProviderTest {
                 storeProvider.getStore(
                         StoreQueryParameters
                                 .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
-                                .withPartition(partition))
+                                .withPartition(partition)).get("1")
         );
         assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore)));
     }
@@ -123,7 +123,7 @@ public class QueryableStoreProviderTest {
                 storeProvider.getStore(
                         StoreQueryParameters
                                 .fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
-                                .withPartition(partition))
+                                .withPartition(partition)).fetch("1", System.currentTimeMillis())
         );
         assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore)));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 0ac170a..884244d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -170,7 +170,7 @@ public class StreamThreadStateStoreProviderTest {
         tasks.put(new TaskId(0, 1), taskTwo);
 
         threadMock = EasyMock.createNiceMock(StreamThread.class);
-        provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder);
+        provider = new StreamThreadStateStoreProvider(threadMock);
 
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index ceb3f79..1897048 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -39,26 +39,24 @@ public class WrappingStoreProviderTest {
 
     private WrappingStoreProvider wrappingStoreProvider;
 
+    private final int numStateStorePartitions = 2;
+
     @Before
     public void before() {
         final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
         final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
 
-
-        stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
-                Serdes.serdeFrom(String.class),
-                Serdes.serdeFrom(String.class))
-                .build());
-        stubProviderOne.addStore("window", new NoOpWindowStore());
-        stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
-                Serdes.serdeFrom(String.class),
-                Serdes.serdeFrom(String.class))
-                .build());
-        stubProviderTwo.addStore("window", new NoOpWindowStore());
-        wrappingStoreProvider = new WrappingStoreProvider(
-            Arrays.asList(stubProviderOne, stubProviderTwo),
-            StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
-        );
+        for (int partition = 0; partition < numStateStorePartitions; partition++) {
+            stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+                    Serdes.serdeFrom(String.class),
+                    Serdes.serdeFrom(String.class))
+                    .build());
+            stubProviderOne.addStore("window", partition, new NoOpWindowStore());
+            wrappingStoreProvider = new WrappingStoreProvider(
+                    Arrays.asList(stubProviderOne, stubProviderTwo),
+                    StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
+            );
+        }
     }
 
     @Test
@@ -82,4 +80,20 @@ public class WrappingStoreProviderTest {
         wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
         wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore());
     }
+
+    @Test
+    public void shouldReturnAllStoreWhenQueryWithoutPartition() {
+        wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));
+        final List<ReadOnlyKeyValueStore<String, String>> results =
+                wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
+        assertEquals(numStateStorePartitions, results.size());
+    }
+
+    @Test
+    public void shouldReturnSingleStoreWhenQueryWithPartition() {
+        wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1));
+        final List<ReadOnlyKeyValueStore<String, String>> results =
+                wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
+        assertEquals(1, results.size());
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
index bc0e33a..9d89ae2 100644
--- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
     private final int defaultStorePartition = 0;
 
     public StateStoreProviderStub(final boolean throwException) {
-        super(null, null);
+        super(null);
         this.throwException = throwException;
     }