You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/16 20:36:33 UTC

[kafka] branch trunk updated: KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text (#4549)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7303f41  KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text (#4549)
7303f41 is described below

commit 7303f41dc171e3a8f57c40abc467582602d1cde4
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Fri Feb 16 20:36:21 2018 +0000

    KAFKA-6424: QueryableStateIntegrationTest#queryOnRebalance should accept raw text (#4549)
    
    Allows input data to be read from a file and removes .toLowerCase in word count stream
    
    Author: Filipe Agapito <fi...@gmail.com>
    
    Reviewers: Ted Yu <yu...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
 .../integration/QueryableStateIntegrationTest.java | 64 ++++++++++++++++------
 1 file changed, 46 insertions(+), 18 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index fc2eacc..8b4e895 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -65,15 +65,20 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -90,6 +95,8 @@ import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class QueryableStateIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
+
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
@@ -133,6 +140,40 @@ public class QueryableStateIntegrationTest {
         CLUSTER.createTopics(outputTopic, outputTopicConcurrent, outputTopicConcurrentWindowed, outputTopicThree);
     }
 
+    /**
+     * Try to read inputValues from {@code resources/QueryableStateIntegrationTest/inputValues.txt}, which might be useful
+     * for larger scale testing. In case of exception, for instance if no such file can be read, return a small list
+     * which satisfies all the prerequisites of the tests.
+     */
+    private List<String> getInputValues() {
+        List<String> input = new ArrayList<>();
+        final ClassLoader classLoader = getClass().getClassLoader();
+        final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
+        try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile()))) {
+            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+                input.add(line);
+            }
+        } catch (final Exception e) {
+            log.warn("Unable to read '{}{}{}'. Using default inputValues list", "resources", File.separator, fileName);
+            input = Arrays.asList(
+                        "hello world",
+                        "all streams lead to kafka",
+                        "streams",
+                        "kafka streams",
+                        "the cat in the hat",
+                        "green eggs and ham",
+                        "that Sam i am",
+                        "up the creek without a paddle",
+                        "run forest run",
+                        "a tank full of gas",
+                        "eat sleep rave repeat",
+                        "one jolly sailor",
+                        "king of the world");
+
+        }
+        return input;
+    }
+
     @Before
     public void before() throws Exception {
         testNo++;
@@ -167,20 +208,7 @@ public class QueryableStateIntegrationTest {
                 return o1.key.compareTo(o2.key);
             }
         };
-        inputValues = Arrays.asList(
-            "hello world",
-            "all streams lead to kafka",
-            "streams",
-            "kafka streams",
-            "the cat in the hat",
-            "green eggs and ham",
-            "that sam i am",
-            "up the creek without a paddle",
-            "run forest run",
-            "a tank full of gas",
-            "eat sleep rave repeat",
-            "one jolly sailor",
-            "king of the world");
+        inputValues = getInputValues();
         inputValuesKeys = new HashSet<>();
         for (final String sentence : inputValues) {
             final String[] words = sentence.split("\\W+");
@@ -215,7 +243,7 @@ public class QueryableStateIntegrationTest {
             .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                 @Override
                 public Iterable<String> apply(final String value) {
-                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    return Arrays.asList(value.split("\\W+"));
                 }
             })
             .groupBy(MockMapper.<String, String>selectValueMapper());
@@ -376,7 +404,7 @@ public class QueryableStateIntegrationTest {
                     storeName + "-" + streamThree);
                 verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
                     windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
-                assertEquals(streamRunnables[i].getStream().state(), KafkaStreams.State.RUNNING);
+                assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
             }
 
             // kill N-1 threads
@@ -391,7 +419,7 @@ public class QueryableStateIntegrationTest {
                 storeName + "-" + streamThree);
             verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
                 windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
-            assertEquals(streamRunnables[0].getStream().state(), KafkaStreams.State.RUNNING);
+            assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
         } finally {
             for (int i = 0; i < numThreads; i++) {
                 if (!streamRunnables[i].isClosed()) {

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.