You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/21 11:07:10 UTC

[GitHub] [kafka] dima5rr opened a new pull request #8706: KAFKA-10030 allow fetching a key from a single partition

dima5rr opened a new pull request #8706:
URL: https://github.com/apache/kafka/pull/8706


   StreamThreadStateStoreProvider#stores throws exception whenever taskId is not found, which is not correct behaviour in multi-threaded env where state store partitions are distributed among several StreamTasks. 
   
   final Task task = tasks.get(keyTaskId);
   if (task == null) {
    throw new InvalidStateStoreException(
    String.format("The specified partition %d for store %s does not exist.",
    storeQueryParams.partition(),
    storeName));
   }
   Reproducible with KStream number of threads more then 1 
   
   StoreQueryIntegrationTest#streamsConfiguration
   
   config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
   
    
   
   Suggested solution is to not throw exception if at least one state store is found, which is always true when using StoreQueryParameters.withPartition


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-632961815


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432393459



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));

Review comment:
       I think how to validate returned store reference, QueryableStoreProvider always wraps it in CompositeReadOnlyKeyValueStore?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r431687179



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -58,9 +58,21 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParameters));
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (stores != null && !stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {

Review comment:
       I extended StateStoreProviderStub functionality to add store by partition with default partition 0, + relevant tests in QueryableStoreProviderTest




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634954533


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432783632



##########
File path: streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java
##########
@@ -22,16 +22,22 @@
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 
+import java.util.AbstractMap.SimpleEntry;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
 
-    private final Map<String, StateStore> stores = new HashMap<>();
+    //<store name : partition> -> state store

Review comment:
       nit: space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634867611


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634867491


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r429567784



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        assertTrue(numStreamThreads > 1);
+        assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+        assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+        //key belongs to this partition
+        final int keyPartition = keyQueryMetadata.getPartition();
+
+        //key doesn't belongs to this partition
+        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1;
+
+        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
+                        .withPartition(keyPartition);
+        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+        if (kafkaStreams1IsActive) {
+            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
+        } else {
+            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
+        }
+
+        if (kafkaStreams1IsActive) {
+            assertThat(store1, is(notNullValue()));
+            assertThat(store2, is(nullValue()));
+        } else {
+            assertThat(store2, is(notNullValue()));
+            assertThat(store1, is(nullValue()));
+        }
+
+        // Assert that only active for a specific requested partition serves key if stale stores and not enabled

Review comment:
       correct, changed test to check both store types




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] brary commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
brary commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r429515106



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        assertTrue(numStreamThreads > 1);
+        assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+        assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+        //key belongs to this partition
+        final int keyPartition = keyQueryMetadata.getPartition();
+
+        //key doesn't belongs to this partition
+        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1;
+
+        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
+                        .withPartition(keyPartition);
+        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+        if (kafkaStreams1IsActive) {
+            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
+        } else {
+            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
+        }
+
+        if (kafkaStreams1IsActive) {
+            assertThat(store1, is(notNullValue()));
+            assertThat(store2, is(nullValue()));
+        } else {
+            assertThat(store2, is(notNullValue()));
+            assertThat(store1, is(nullValue()));
+        }
+
+        // Assert that only active for a specific requested partition serves key if stale stores and not enabled

Review comment:
       stale stores "are" not enabled?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r431361485



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -296,6 +289,75 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        assertTrue(numStreamThreads > 1);

Review comment:
       `numStreamThreads` is a `final` variable -> assertion can be removed

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -58,9 +58,21 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParameters));
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (stores != null && !stores.isEmpty()) {
+                allStores.addAll(stores);
+                if (storeQueryParameters.partition() != null) {
+                    break;
+                }
+            }
         }
         if (allStores.isEmpty()) {
+            if (storeQueryParameters.partition() != null) {

Review comment:
       We should extend `QueryableStoreProviderTest` for this case -- throwing this exception moves from `StreamThreadStateStoreProvider` to here and we should not just remove the test from `StreamThreadStateStoreProviderTest` but add a new one to `QueryableStoreProviderTest`, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-636129274


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432144610



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -58,9 +58,21 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
         }
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
-            allStores.addAll(storeProvider.stores(storeQueryParameters));
+            final List<T> stores = storeProvider.stores(storeQueryParameters);
+            if (stores != null && !stores.isEmpty()) {

Review comment:
       Thinking about this one more, `stores` can never be `null`? Can we remove this check?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##########
@@ -48,7 +48,9 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
         final List<T> allStores = new ArrayList<>();
         for (final StreamThreadStateStoreProvider provider : storeProviders) {
             final List<T> stores = provider.stores(storeQueryParameters);
-            allStores.addAll(stores);
+            if (stores != null && !stores.isEmpty()) {
+                allStores.addAll(stores);
+            }

Review comment:
       As above: `stores` should never be null, and thus we don't need this change? Also the check for `isEmpty` does give us much, we can still call `addAll` even it `stores` is empty?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));

Review comment:
       It might be better to test if the right store is returned instead of just checking for not-null? For this, in `before()` we need to get a reference on the store we pass into `addStore()`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
+    }
+
+    @Test(expected = InvalidStateStoreException.class)
+    public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
+        storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions + 1));

Review comment:
       Can we split this as follows:
   ```
   final StoreQueryParameters parameters = (StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions + 1);
   
   final InvalidStateStoreException exception = asserThrows(
     InvalidStateStoreException.class,
     () -> storeProvider.getStore(parameters)
   );
   assertThat(exception.message(), equalTo("..."));
   ```
   
   And remove the `(excpected = ...)` annotation.
   
   (1) We should always limit the code that might throw the exception (eg, if `withPartition` would throw an `InvalidStateStoreException` the test should fail, but would pass in it's current setup) (2) We should always verify the exception cause -- `getStore()` could throw an `InvalidStateStoreException` or multiple reasons and we should make sure it's throwing for the reason under test.
   
   Same below for the windowed case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r432671229



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java
##########
@@ -88,5 +91,23 @@ public void shouldFindGlobalStores() {
         assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldReturnKVStoreWithPartitionWhenItExists() {
+        assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));

Review comment:
       Hmmm... Good point. Let leave it as-is. It's also covered in integration tests that the right store is returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-636129958


   Jenkins does not cooperator right now. Will try again later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-632265933


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634868120


   Seems Jenkins is too busy... Will try again later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-635625057


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634867834


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-632961839


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-636129816


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8706:
URL: https://github.com/apache/kafka/pull/8706


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-636129086


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dima5rr commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
dima5rr commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r428899383



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -332,6 +332,7 @@ private Properties streamsConfiguration() {
         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Review comment:
       Added dedicated test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-632267029


   @vinothchandar @brary for review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-635042381


   ```
   15:01:33 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:25: Using the '.*' form of import should be avoided - org.apache.kafka.streams.*. [AvoidStarImport]
   15:01:33 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:56: Using the '.*' form of import should be avoided - org.hamcrest.Matchers.*. [AvoidStarImport]
   ```
   cc @dima5rr 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-635625227


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-635633908


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-637215804


   Thanks for the PR @dima5rr.
   
   Merged to `trunk` and cherry-picked to `2.6` and `2.5` branches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-634954462


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#issuecomment-636208328


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r428831152



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -332,6 +332,7 @@ private Properties streamsConfiguration() {
         config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
         config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Review comment:
       We should write a proper test case instead of "piggy-backing" it into an existing test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org