You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/08 21:38:10 UTC
[kafka] branch 2.7 updated: KAFKA-10271: Performance regression
while fetching a key from a single partition (#9020)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new ccff722 KAFKA-10271: Performance regression while fetching a key from a single partition (#9020)
ccff722 is described below
commit ccff7227ce11747c758c0903b34ee1d84119b9f9
Author: Dima Reznik <di...@fiverr.com>
AuthorDate: Thu Oct 8 20:12:33 2020 +0300
KAFKA-10271: Performance regression while fetching a key from a single partition (#9020)
StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.
Reviewers: John Roesler <vv...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../state/internals/QueryableStoreProvider.java | 20 -------
.../internals/StreamThreadStateStoreProvider.java | 67 ++++++++--------------
.../state/internals/WrappingStoreProvider.java | 17 +++++-
.../integration/EosBetaUpgradeIntegrationTest.java | 1 +
.../integration/JoinStoreIntegrationTest.java | 2 +-
.../internals/QueryableStoreProviderTest.java | 12 ++--
.../StreamThreadStateStoreProviderTest.java | 2 +-
.../state/internals/WrappingStoreProviderTest.java | 44 +++++++++-----
.../apache/kafka/test/StateStoreProviderStub.java | 2 +-
10 files changed, 79 insertions(+), 90 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f78023a..83b84dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -782,7 +782,7 @@ public class KafkaStreams implements AutoCloseable {
delegatingStateRestoreListener,
i + 1);
threadState.put(threads[i].getId(), threads[i].state());
- storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
+ storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 2af5874..8dd1f03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.StoreQueryParameters;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -56,25 +55,6 @@ public class QueryableStoreProvider {
if (!globalStore.isEmpty()) {
return queryableStoreType.create(globalStoreProvider, storeName);
}
- final List<T> allStores = new ArrayList<>();
- for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
- final List<T> stores = storeProvider.stores(storeQueryParameters);
- if (!stores.isEmpty()) {
- allStores.addAll(stores);
- if (storeQueryParameters.partition() != null) {
- break;
- }
- }
- }
- if (allStores.isEmpty()) {
- if (storeQueryParameters.partition() != null) {
- throw new InvalidStateStoreException(
- String.format("The specified partition %d for store %s does not exist.",
- storeQueryParameters.partition(),
- storeName));
- }
- throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
- }
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders, storeQueryParameters),
storeName
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 7cc263a..d5a175d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -20,7 +20,6 @@ import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
public class StreamThreadStateStoreProvider {
private final StreamThread streamThread;
- private final InternalTopologyBuilder internalTopologyBuilder;
- public StreamThreadStateStoreProvider(final StreamThread streamThread,
- final InternalTopologyBuilder internalTopologyBuilder) {
+ public StreamThreadStateStoreProvider(final StreamThread streamThread) {
this.streamThread = streamThread;
- this.internalTopologyBuilder = internalTopologyBuilder;
}
@SuppressWarnings("unchecked")
public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
final String storeName = storeQueryParams.storeName();
final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
- final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
final StreamThread.State state = streamThread.state();
if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
- final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
- final List<T> stores = new ArrayList<>();
- if (keyTaskId != null) {
- final Task task = tasks.get(keyTaskId);
- if (task == null) {
- return Collections.emptyList();
- }
- final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
- if (store != null) {
- return Collections.singletonList(store);
- }
+ final Collection<Task> tasks = storeQueryParams.staleStoresEnabled() ?
+ streamThread.allTasks().values() : streamThread.activeTasks();
+
+ if (storeQueryParams.partition() != null) {
+ return findStreamTask(tasks, storeName, storeQueryParams.partition()).
+ map(streamTask ->
+ validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+ map(Collections::singletonList).
+ orElse(Collections.emptyList());
} else {
- for (final Task streamTask : tasks.values()) {
- final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
- if (store != null) {
- stores.add(store);
- }
- }
+ return tasks.stream().
+ map(streamTask ->
+ validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
+ filter(Objects::nonNull).
+ collect(Collectors.toList());
}
- return stores;
} else {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
state + ", not RUNNING" +
@@ -104,19 +95,11 @@ public class StreamThreadStateStoreProvider {
}
}
- private TaskId createKeyTaskId(final String storeName, final Integer partition) {
- if (partition == null) {
- return null;
- }
- final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
- final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
- final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
- for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
- if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
- return new TaskId(topicGroup.getKey(), partition);
- }
- }
- throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
- partition + " is not available on this instance");
+ private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
+ return tasks.stream().
+ filter(streamTask -> streamTask.id().partition == partition &&
+ streamTask.getStore(storeName) != null &&
+ storeName.equals(streamTask.getStore(storeName).name())).
+ findFirst();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 5c9ae1a..26c5db0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -46,11 +46,22 @@ public class WrappingStoreProvider implements StateStoreProvider {
public <T> List<T> stores(final String storeName,
final QueryableStoreType<T> queryableStoreType) {
final List<T> allStores = new ArrayList<>();
- for (final StreamThreadStateStoreProvider provider : storeProviders) {
- final List<T> stores = provider.stores(storeQueryParameters);
- allStores.addAll(stores);
+ for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
+ final List<T> stores = storeProvider.stores(storeQueryParameters);
+ if (!stores.isEmpty()) {
+ allStores.addAll(stores);
+ if (storeQueryParameters.partition() != null) {
+ break;
+ }
+ }
}
if (allStores.isEmpty()) {
+ if (storeQueryParameters.partition() != null) {
+ throw new InvalidStateStoreException(
+ String.format("The specified partition %d for store %s does not exist.",
+ storeQueryParameters.partition(),
+ storeName));
+ }
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index cd57acb..d039d98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -1080,6 +1080,7 @@ public class EosBetaUpgradeIntegrationTest {
streams,
StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())
);
+ waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> "State store did not ready: " + storeName);
final Set<Long> keys = new HashSet<>();
try (final KeyValueIterator<Long, Long> it = store.all()) {
while (it.hasNext()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c519117..e15788a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -108,7 +108,7 @@ public class JoinStoreIntegrationTest {
kafkaStreams.start();
latch.await();
- assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())));
+ assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1));
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 2d04755..f2ca0c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -60,12 +60,12 @@ public class QueryableStoreProviderTest {
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionIfKVStoreDoesntExist() {
- storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()));
+ storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionIfWindowStoreDoesntExist() {
- storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore()));
+ storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
}
@Test
@@ -80,12 +80,12 @@ public class QueryableStoreProviderTest {
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() {
- storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore()));
+ storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1");
}
@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() {
- storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore()));
+ storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
}
@Test
@@ -106,7 +106,7 @@ public class QueryableStoreProviderTest {
storeProvider.getStore(
StoreQueryParameters
.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
- .withPartition(partition))
+ .withPartition(partition)).get("1")
);
assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore)));
}
@@ -123,7 +123,7 @@ public class QueryableStoreProviderTest {
storeProvider.getStore(
StoreQueryParameters
.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
- .withPartition(partition))
+ .withPartition(partition)).fetch("1", System.currentTimeMillis())
);
assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore)));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 0ac170a..884244d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -170,7 +170,7 @@ public class StreamThreadStateStoreProviderTest {
tasks.put(new TaskId(0, 1), taskTwo);
threadMock = EasyMock.createNiceMock(StreamThread.class);
- provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder);
+ provider = new StreamThreadStateStoreProvider(threadMock);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
index ceb3f79..1897048 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java
@@ -39,26 +39,24 @@ public class WrappingStoreProviderTest {
private WrappingStoreProvider wrappingStoreProvider;
+ private final int numStateStorePartitions = 2;
+
@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);
-
- stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
- Serdes.serdeFrom(String.class),
- Serdes.serdeFrom(String.class))
- .build());
- stubProviderOne.addStore("window", new NoOpWindowStore());
- stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
- Serdes.serdeFrom(String.class),
- Serdes.serdeFrom(String.class))
- .build());
- stubProviderTwo.addStore("window", new NoOpWindowStore());
- wrappingStoreProvider = new WrappingStoreProvider(
- Arrays.asList(stubProviderOne, stubProviderTwo),
- StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
- );
+ for (int partition = 0; partition < numStateStorePartitions; partition++) {
+ stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
+ Serdes.serdeFrom(String.class),
+ Serdes.serdeFrom(String.class))
+ .build());
+ stubProviderOne.addStore("window", partition, new NoOpWindowStore());
+ wrappingStoreProvider = new WrappingStoreProvider(
+ Arrays.asList(stubProviderOne, stubProviderTwo),
+ StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
+ );
+ }
}
@Test
@@ -82,4 +80,20 @@ public class WrappingStoreProviderTest {
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore());
}
+
+ @Test
+ public void shouldReturnAllStoreWhenQueryWithoutPartition() {
+ wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));
+ final List<ReadOnlyKeyValueStore<String, String>> results =
+ wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
+ assertEquals(numStateStorePartitions, results.size());
+ }
+
+ @Test
+ public void shouldReturnSingleStoreWhenQueryWithPartition() {
+ wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1));
+ final List<ReadOnlyKeyValueStore<String, String>> results =
+ wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
+ assertEquals(1, results.size());
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
index bc0e33a..9d89ae2 100644
--- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
private final int defaultStorePartition = 0;
public StateStoreProviderStub(final boolean throwException) {
- super(null, null);
+ super(null);
this.throwException = throwException;
}