You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/02 01:36:54 UTC

[kafka] branch 2.5 updated: KAFKA-10030: Allow fetching a key from a single partition (#8706)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new ee62965  KAFKA-10030: Allow fetching a key from a single partition (#8706)
ee62965 is described below

commit ee62965ad760709b43386145631809f41bb6378b
Author: Dima Reznik <di...@fiverr.com>
AuthorDate: Tue Jun 2 04:06:28 2020 +0300

    KAFKA-10030: Allow fetching a key from a single partition (#8706)
    
    Reviewers: Navinder Pal Singh Brar <na...@yahoo.com>, Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../state/internals/QueryableStoreProvider.java    | 14 ++++-
 .../internals/StreamThreadStateStoreProvider.java  |  5 +-
 .../integration/StoreQueryIntegrationTest.java     | 69 ++++++++++++++++++++++
 .../internals/QueryableStoreProviderTest.java      | 42 ++++++++++++-
 .../StreamThreadStateStoreProviderTest.java        | 14 ++---
 .../apache/kafka/test/StateStoreProviderStub.java  | 29 +++++++--
 6 files changed, 151 insertions(+), 22 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 8917164..2af5874 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -58,9 +58,21 @@ public class QueryableStoreProvider {
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParameters));
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (!stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {
+                throw new InvalidStateStoreException(
+                        String.format("The specified partition %d for store %s does not exist.",
+                                storeQueryParameters.partition(),
+                                storeName));
+            }
             throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
         }
         return queryableStoreType.create(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 206c50b..0b4a83f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -61,10 +61,7 @@ public class StreamThreadStateStoreProvider {
             if (keyTaskId != null) {
                 final Task task = tasks.get(keyTaskId);
                 if (task == null) {
-                    throw new InvalidStateStoreException(
-                        String.format("The specified partition %d for store %s does not exist.",
-                            storeQueryParams.partition(),
-                            storeName));
+                    return Collections.emptyList();
                 }
                 final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
                 if (store != null) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index ec820f2..e867691 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -61,6 +61,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertTrue;
 
 
 @Category({IntegrationTest.class})
@@ -302,6 +303,74 @@ public class StoreQueryIntegrationTest {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+        assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+        //key belongs to this partition
+        final int keyPartition = keyQueryMetadata.getPartition();
+
+        //key doesn't belongs to this partition
+        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
+
+        // Assert that both active and standby are able to query for a key
+        final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> param = StoreQueryParameters
+                .fromNameAndType(TABLE_NAME, queryableStoreType)
+                .enableStaleStores()
+                .withPartition(keyPartition);
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1.store(param);
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2.store(param);
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
+
+        final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> otherParam = StoreQueryParameters
+                .fromNameAndType(TABLE_NAME, queryableStoreType)
+                .enableStaleStores()
+                .withPartition(keyDontBelongPartition);
+        final ReadOnlyKeyValueStore<Integer, Integer> store3 = kafkaStreams1.store(otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store4 = kafkaStreams2.store(otherParam);
+
+        // Assert that
+        assertThat(store3.get(key), is(nullValue()));
+        assertThat(store4.get(key), is(nullValue()));
+    }
+
     private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) {
         final KafkaStreams streams = new KafkaStreams(builder.build(config), config);
         streamsToCleanup.add(streams);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
index 19a0355..2d04755 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashMap;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertNotNull;
 
 public class QueryableStoreProviderTest {
@@ -38,12 +41,15 @@ public class QueryableStoreProviderTest {
     private final String windowStore = "window-store";
     private QueryableStoreProvider storeProvider;
     private HashMap<String, StateStore> globalStateStores;
+    private final int numStateStorePartitions = 2;
 
     @Before
     public void before() {
         final StateStoreProviderStub theStoreProvider = new StateStoreProviderStub(false);
-        theStoreProvider.addStore(keyValueStore, new NoOpReadOnlyStore<>());
-        theStoreProvider.addStore(windowStore, new NoOpWindowStore());
+        for (int partition = 0; partition < numStateStorePartitions; partition++) {
+            theStoreProvider.addStore(keyValueStore, partition, new NoOpReadOnlyStore<>());
+            theStoreProvider.addStore(windowStore, partition, new NoOpWindowStore());
+        }
         globalStateStores = new HashMap<>();
         storeProvider =
             new QueryableStoreProvider(
@@ -88,5 +94,37 @@ public class QueryableStoreProviderTest {
         assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
+        final int partition = numStateStorePartitions + 1;
+        final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
+                storeProvider.getStore(
+                        StoreQueryParameters
+                                .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
+                                .withPartition(partition))
+        );
+        assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore)));
+    }
 
+    @Test
+    public void shouldReturnWindowStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore()).withPartition(numStateStorePartitions - 1)));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() {
+        final int partition = numStateStorePartitions + 1;
+        final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
+                storeProvider.getStore(
+                        StoreQueryParameters
+                                .fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
+                                .withPartition(partition))
+        );
+        assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore)));
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 767875b..60ae95a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -65,9 +65,7 @@ import java.util.Properties;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
 
 public class StreamThreadStateStoreProviderTest {
 
@@ -315,16 +313,12 @@ public class StreamThreadStateStoreProviderTest {
     }
 
     @Test
-    public void shouldThrowForInvalidPartitions() {
+    public void shouldReturnEmptyListForInvalidPartitions() {
         mockThread(true);
-        final InvalidStateStoreException thrown = assertThrows(
-            InvalidStateStoreException.class,
-            () -> provider.stores(
-                StoreQueryParameters
-                    .fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore())
-                    .withPartition(2))
+        assertEquals(
+                Collections.emptyList(),
+                provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(2))
         );
-        assertThat(thrown.getMessage(), equalTo("The specified partition 2 for store kv-store does not exist."));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
index 13a29e1..bc0e33a 100644
--- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
@@ -22,16 +22,22 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 
+import java.util.AbstractMap.SimpleEntry;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
 
-    private final Map<String, StateStore> stores = new HashMap<>();
+    //<store name : partition> -> state store
+    private final Map<Entry<String, Integer>, StateStore> stores = new HashMap<>();
     private final boolean throwException;
 
+    private final int defaultStorePartition = 0;
+
     public StateStoreProviderStub(final boolean throwException) {
         super(null, null);
         this.throwException = throwException;
@@ -45,15 +51,28 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
         if (throwException) {
             throw new InvalidStateStoreException("store is unavailable");
         }
-        if (stores.containsKey(storeName) && queryableStoreType.accepts(stores.get(storeName))) {
-            return (List<T>) Collections.singletonList(stores.get(storeName));
+        if (storeQueryParameters.partition() != null) {
+            final Entry<String, Integer> stateStoreKey = new SimpleEntry<>(storeName, storeQueryParameters.partition());
+            if (stores.containsKey(stateStoreKey) && queryableStoreType.accepts(stores.get(stateStoreKey))) {
+                return (List<T>) Collections.singletonList(stores.get(stateStoreKey));
+            }
+            return Collections.emptyList();
         }
-        return Collections.emptyList();
+        return (List<T>) Collections.unmodifiableList(
+                stores.entrySet().stream().
+                        filter(entry -> entry.getKey().getKey().equals(storeName) && queryableStoreType.accepts(entry.getValue())).
+                        map(Entry::getValue).
+                        collect(Collectors.toList()));
     }
 
     public void addStore(final String storeName,
                          final StateStore store) {
-        stores.put(storeName, store);
+        addStore(storeName, defaultStorePartition, store);
     }
 
+    public void addStore(final String storeName,
+                         final int partition,
+                         final StateStore store) {
+        stores.put(new SimpleEntry<>(storeName, partition), store);
+    }
 }