You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/23 05:43:28 UTC

[GitHub] [kafka] brary commented on a change in pull request #8706: KAFKA-10030 allow fetching a key from a single partition

brary commented on a change in pull request #8706:
URL: https://github.com/apache/kafka/pull/8706#discussion_r429515106



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -296,6 +297,87 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception {
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    @Test
+    public void shouldQuerySpecificActivePartitionStoresMultiStreamThreads() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+        final int numStreamThreads = 2;
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                        .withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final Properties streamsConfiguration1 = streamsConfiguration();
+        streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final Properties streamsConfiguration2 = streamsConfiguration();
+        streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration1);
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration2);
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        assertTrue(numStreamThreads > 1);
+        assertTrue(kafkaStreams1.localThreadsMetadata().size() > 1);
+        assertTrue(kafkaStreams2.localThreadsMetadata().size() > 1);
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new IntegerSerializer());
+
+        //key belongs to this partition
+        final int keyPartition = keyQueryMetadata.getPartition();
+
+        //key doesn't belongs to this partition
+        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+        final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1;
+
+        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
+                        .withPartition(keyPartition);
+        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+        if (kafkaStreams1IsActive) {
+            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
+        } else {
+            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
+        }
+
+        if (kafkaStreams1IsActive) {
+            assertThat(store1, is(notNullValue()));
+            assertThat(store2, is(nullValue()));
+        } else {
+            assertThat(store2, is(notNullValue()));
+            assertThat(store1, is(nullValue()));
+        }
+
+        // Assert that only active for a specific requested partition serves key if stale stores and not enabled

Review comment:
       stale stores "are" not enabled?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org