You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/02 01:36:54 UTC
[kafka] branch 2.5 updated: KAFKA-10030: Allow fetching a key from
a single partition (#8706)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new ee62965 KAFKA-10030: Allow fetching a key from a single partition (#8706)
ee62965 is described below
commit ee62965ad760709b43386145631809f41bb6378b
Author: Dima Reznik <di...@fiverr.com>
AuthorDate: Tue Jun 2 04:06:28 2020 +0300
KAFKA-10030: Allow fetching a key from a single partition (#8706)
Reviewers: Navinder Pal Singh Brar <na...@yahoo.com>, Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../state/internals/QueryableStoreProvider.java | 14 ++++-
.../internals/StreamThreadStateStoreProvider.java | 5 +-
.../integration/StoreQueryIntegrationTest.java | 69 ++++++++++++++++++++++
.../internals/QueryableStoreProviderTest.java | 42 ++++++++++++-
.../StreamThreadStateStoreProviderTest.java | 14 ++---
.../apache/kafka/test/StateStoreProviderStub.java | 29 +++++++--
6 files changed, 151 insertions(+), 22 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 8917164..2af5874 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -58,9 +58,21 @@ public class QueryableStoreProvider {
}
final List<T> allStores = new ArrayList<>();
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
- allStores.addAll(storeProvider.stores(storeQueryParameters));
+ final List<T> stores = storeProvider.stores(storeQueryParameters);
+ if (!stores.isEmpty()) {
+ allStores.addAll(stores);
+ if (storeQueryParameters.partition() != null) {
+ break;
+ }
+ }
}
if (allStores.isEmpty()) {
+ if (storeQueryParameters.partition() != null) {
+ throw new InvalidStateStoreException(
+ String.format("The specified partition %d for store %s does not exist.",
+ storeQueryParameters.partition(),
+ storeName));
+ }
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 206c50b..0b4a83f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -61,10 +61,7 @@ public class StreamThreadStateStoreProvider {
if (keyTaskId != null) {
final Task task = tasks.get(keyTaskId);
if (task == null) {
- throw new InvalidStateStoreException(
- String.format("The specified partition %d for store %s does not exist.",
- storeQueryParams.partition(),
- storeName));
+ return Collections.emptyList();
}
final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
if (store != null) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index ec820f2..e867691 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -61,6 +61,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
@@ -302,6 +303,74 @@ public class StoreQueryIntegrationTest {
assertThat(store4.get(key), is(nullValue()));
}
+ @Test
+ public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception {
+ final int batch1NumMessages = 100;
+ final int key = 1;
+ final Semaphore semaphore = new Semaphore(0);
+ final int numStreamThreads = 2;
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+ Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+ .withCachingDisabled())
+ .toStream()
+ .peek((k, v) -> semaphore.release());
+
+ final Properties streamsConfiguration1 = streamsConfiguration();
+ streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+ final Properties streamsConfiguration2 = streamsConfiguration();
+ streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+ final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+ startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+ assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+ assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+ produceValueRange(key, 0, batch1NumMessages);
+
+ // Assert that all messages in the first batch were processed in a timely manner
+ assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+ final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+ //key belongs to this partition
+ final int keyPartition = keyQueryMetadata.getPartition();
+
+ //key doesn't belongs to this partition
+ final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+ final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
+
+ // Assert that both active and standby are able to query for a key
+ final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> param = StoreQueryParameters
+ .fromNameAndType(TABLE_NAME, queryableStoreType)
+ .enableStaleStores()
+ .withPartition(keyPartition);
+ TestUtils.waitForCondition(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1.store(param);
+ return store1.get(key) != null;
+ }, "store1 cannot find results for key");
+ TestUtils.waitForCondition(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2.store(param);
+ return store2.get(key) != null;
+ }, "store2 cannot find results for key");
+
+ final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> otherParam = StoreQueryParameters
+ .fromNameAndType(TABLE_NAME, queryableStoreType)
+ .enableStaleStores()
+ .withPartition(keyDontBelongPartition);
+ final ReadOnlyKeyValueStore<Integer, Integer> store3 = kafkaStreams1.store(otherParam);
+ final ReadOnlyKeyValueStore<Integer, Integer> store4 = kafkaStreams2.store(otherParam);
+
+ // Assert that
+ assertThat(store3.get(key), is(nullValue()));
+ assertThat(store4.get(key), is(nullValue()));
+ }
+
private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) {
final KafkaStreams streams = new KafkaStreams(builder.build(config), config);
streamsToCleanup.add(streams);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 19a0355..2d04755 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertNotNull;
public class QueryableStoreProviderTest {
@@ -38,12 +41,15 @@ public class QueryableStoreProviderTest {
private final String windowStore = "window-store";
private QueryableStoreProvider storeProvider;
private HashMap<String, StateStore> globalStateStores;
+ private final int numStateStorePartitions = 2;
@Before
public void before() {
final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false);
- theStoreProvider.addStore(keyValueStore, new NoOpReadOnlyStore<>());
- theStoreProvider.addStore(windowStore, new NoOpWindowStore());
+ for (int partition = 0; partition < numStateStorePartitions; partition++) {
+ theStoreProvider.addStore(keyValueStore, partition, new NoOpReadOnlyStore<>());
+ theStoreProvider.addStore(windowStore, partition, new NoOpWindowStore());
+ }
globalStateStores = new HashMap<>();
storeProvider =
new QueryableStoreProvider(
@@ -88,5 +94,37 @@ public class QueryableStoreProviderTest {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
}
+ @Test
+ public void shouldReturnKVStoreWithPartitionWhenItExists() {
+ assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
+ final int partition = numStateStorePartitions + 1;
+ final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
+ storeProvider.getStore(
+ StoreQueryParameters
+ .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
+ .withPartition(partition))
+ );
+ assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore)));
+ }
+ @Test
+ public void shouldReturnWindowStoreWithPartitionWhenItExists() {
+ assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore()).withPartition(numStateStorePartitions - 1)));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() {
+ final int partition = numStateStorePartitions + 1;
+ final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
+ storeProvider.getStore(
+ StoreQueryParameters
+ .fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
+ .withPartition(partition))
+ );
+ assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore)));
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 767875b..60ae95a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -65,9 +65,7 @@ import java.util.Properties;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
public class StreamThreadStateStoreProviderTest {
@@ -315,16 +313,12 @@ public class StreamThreadStateStoreProviderTest {
}
@Test
- public void shouldThrowForInvalidPartitions() {
+ public void shouldReturnEmptyListForInvalidPartitions() {
mockThread(true);
- final InvalidStateStoreException thrown = assertThrows(
- InvalidStateStoreException.class,
- () -> provider.stores(
- StoreQueryParameters
- .fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore())
- .withPartition(2))
+ assertEquals(
+ Collections.emptyList(),
+ provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(2))
);
- assertThat(thrown.getMessage(), equalTo("The specified partition 2 for store kv-store does not exist."));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
index 13a29e1..bc0e33a 100644
--- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -22,16 +22,22 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+import java.util.AbstractMap.SimpleEntry;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
- private final Map<String, StateStore> stores = new HashMap<>();
+ //<store name : partition> -> state store
+ private final Map<Entry<String, Integer>, StateStore> stores = new HashMap<>();
private final boolean throwException;
+ private final int defaultStorePartition = 0;
+
public StateStoreProviderStub(final boolean throwException) {
super(null, null);
this.throwException = throwException;
@@ -45,15 +51,28 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
if (throwException) {
throw new InvalidStateStoreException("store is unavailable");
}
- if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) {
- return (List<T>) Collections.singletonList(stores.get(storeName));
+ if (storeQueryParameters.partition() != null) {
+ final Entry<String, Integer> stateStoreKey = new SimpleEntry<>(storeName, storeQueryParameters.partition());
+ if (stores.containsKey(stateStoreKey) && queryableStoreType.accepts(stores.get(stateStoreKey))) {
+ return (List<T>) Collections.singletonList(stores.get(stateStoreKey));
+ }
+ return Collections.emptyList();
}
- return Collections.emptyList();
+ return (List<T>) Collections.unmodifiableList(
+ stores.entrySet().stream().
+ filter(entry -> entry.getKey().getKey().equals(storeName) && queryableStoreType.accepts(entry.getValue())).
+ map(Entry::getValue).
+ collect(Collectors.toList()));
}
public void addStore(final String storeName,
final StateStore store) {
- stores.put(storeName, store);
+ addStore(storeName, defaultStorePartition, store);
}
+ public void addStore(final String storeName,
+ final int partition,
+ final StateStore store) {
+ stores.put(new SimpleEntry<>(storeName, partition), store);
+ }
}