You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/16 20:36:33 UTC
[kafka] branch trunk updated: KAFKA-6424:
QueryableStateIntegrationTest#queryOnRebalance should accept raw text
(#4549)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7303f41 KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text (#4549)
7303f41 is described below
commit 7303f41dc171e3a8f57c40abc467582602d1cde4
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Fri Feb 16 20:36:21 2018 +0000
KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text (#4549)
Allows input data to be read from a file and removes .toLowerCase in word count stream
Author: Filipe Agapito <fi...@gmail.com>
Reviewers: Ted Yu <yu...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
.../integration/QueryableStateIntegrationTest.java | 64 ++++++++++++++++------
1 file changed, 46 insertions(+), 18 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fc2eacc..8b4e895 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -65,15 +65,20 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -90,6 +95,8 @@ import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
+ private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
+
private static final int NUM_BROKERS = 1;
@ClassRule
@@ -133,6 +140,40 @@ public class QueryableStateIntegrationTest {
CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
}
+ /**
+ * Try to read inputValues from {@code resources/QueryableStateIntegrationTest/inputValues.txt}, which might be useful
+ * for larger scale testing. In case of exception, for instance if no such file can be read, return a small list
+ * which satisfies all the prerequisites of the tests.
+ */
+ private List<String> getInputValues() {
+ List<String> input = new ArrayList<>();
+ final ClassLoader classLoader = getClass().getClassLoader();
+ final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
+ try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile()))) {
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ input.add(line);
+ }
+ } catch (final Exception e) {
+ log.warn("Unable to read '{}{}{}'. Using default inputValues list", "resources", File.separator, fileName);
+ input = Arrays.asList(
+ "hello world",
+ "all streams lead to kafka",
+ "streams",
+ "kafka streams",
+ "the cat in the hat",
+ "green eggs and ham",
+ "that Sam i am",
+ "up the creek without a paddle",
+ "run forest run",
+ "a tank full of gas",
+ "eat sleep rave repeat",
+ "one jolly sailor",
+ "king of the world");
+
+ }
+ return input;
+ }
+
@Before
public void before() throws Exception {
testNo++;
@@ -167,20 +208,7 @@ public class QueryableStateIntegrationTest {
return o1.key.compareTo(o2.key);
}
};
- inputValues = Arrays.asList(
- "hello world",
- "all streams lead to kafka",
- "streams",
- "kafka streams",
- "the cat in the hat",
- "green eggs and ham",
- "that sam i am",
- "up the creek without a paddle",
- "run forest run",
- "a tank full of gas",
- "eat sleep rave repeat",
- "one jolly sailor",
- "king of the world");
+ inputValues = getInputValues();
inputValuesKeys = new HashSet<>();
for (final String sentence : inputValues) {
final String[] words = sentence.split("\\W+");
@@ -215,7 +243,7 @@ public class QueryableStateIntegrationTest {
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(final String value) {
- return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+ return Arrays.asList(value.split("\\W+"));
}
})
.groupBy(MockMapper.<String, String>selectValueMapper());
@@ -376,7 +404,7 @@ public class QueryableStateIntegrationTest {
storeName + "-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
- assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
+ assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
}
// kill N-1 threads
@@ -391,7 +419,7 @@ public class QueryableStateIntegrationTest {
storeName + "-" + streamThree);
verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
- assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
+ assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
} finally {
for (int i = 0; i < numThreads; i++) {
if (!streamRunnables[i].isClosed()) {
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.