You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/12 17:39:24 UTC
[kafka] branch trunk updated: MINOR: Remove allow concurrent test
(#8641)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 035299a MINOR: Remove allow concurrent test (#8641)
035299a is described below
commit 035299a55d5b7a7de3feb628018fcd15b9775dbc
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue May 12 10:38:23 2020 -0700
MINOR: Remove allow concurrent test (#8641)
Reviewers: John Roesler <vv...@apache.org>
---
.../integration/QueryableStateIntegrationTest.java | 46 ----------------------
1 file changed, 46 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 4c97cdd..5c4e0a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -623,52 +623,6 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldAllowConcurrentAccesses() throws Exception {
- final int numIterations = 500000;
- final String storeName = "word-count-store";
- final String windowStoreName = "windowed-word-count-store";
-
- // send one round of records first to populate the stores
- ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
- producerRunnable.run();
-
- producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
- final Thread producerThread = new Thread(producerRunnable);
- kafkaStreams = createCountStream(
- streamConcurrent,
- outputTopicConcurrent,
- outputTopicConcurrentWindowed,
- storeName,
- windowStoreName,
- streamsConfiguration);
-
- startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- producerThread.start();
-
- try {
- waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
-
- final ReadOnlyKeyValueStore<String, Long> keyValueStore =
- IntegrationTestUtils.getStore(storeName + "-" + streamConcurrent, kafkaStreams, QueryableStoreTypes.keyValueStore());
-
- final ReadOnlyWindowStore<String, Long> windowStore =
- IntegrationTestUtils.getStore(windowStoreName + "-" + streamConcurrent, kafkaStreams, QueryableStoreTypes.windowStore());
-
- final Map<String, Long> expectedWindowState = new HashMap<>();
- final Map<String, Long> expectedCount = new HashMap<>();
- while (producerRunnable.getCurrIteration() < numIterations) {
- verifyGreaterOrEqual(inputValuesKeys.toArray(new String[0]), expectedWindowState,
- expectedCount, windowStore, keyValueStore);
- }
- } finally {
- producerRunnable.shutdown();
- producerThread.interrupt();
- producerThread.join();
- }
- }
-
- @Test
public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
verifyCanQueryState(0);
}