You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 10:28:55 UTC

kafka git commit: KAFKA-4222; QueryableIntegrationTest.queryOnRebalance transient failure

Repository: kafka
Updated Branches:
  refs/heads/trunk 14a3d69d9 -> 816578b5c


KAFKA-4222; QueryableIntegrationTest.queryOnRebalance transient failure

Don't produce messages on a separate thread continuosly. Just produce one of each value and stop.
Close the producer once finished.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3080 from dguy/qs-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/816578b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/816578b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/816578b5

Branch: refs/heads/trunk
Commit: 816578b5c13d36d73af02d1c11787f789a69be3f
Parents: 14a3d69
Author: Damian Guy <da...@gmail.com>
Authored: Thu May 18 11:28:53 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 11:28:53 2017 +0100

----------------------------------------------------------------------
 .../QueryableStateIntegrationTest.java          | 27 ++++++++------------
 1 file changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/816578b5/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 4b5ae17..f2d0427 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -329,19 +329,17 @@ public class QueryableStateIntegrationTest {
         final int numThreads = STREAM_TWO_PARTITIONS;
         final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
         final Thread[] streamThreads = new Thread[numThreads];
-        final int numIterations = 500000;
 
-        // create concurrent producer
-        final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, numIterations);
-        final Thread producerThread = new Thread(producerRunnable);
+        final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
+        producerRunnable.run();
 
-        // create three stream threads
+
+        // create stream threads
         for (int i = 0; i < numThreads; i++) {
             streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
-        producerThread.start();
 
         try {
             waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
@@ -375,9 +373,6 @@ public class QueryableStateIntegrationTest {
                     streamThreads[i].join();
                 }
             }
-            producerRunnable.shutdown();
-            producerThread.interrupt();
-            producerThread.join();
         }
     }
 
@@ -913,15 +908,15 @@ public class QueryableStateIntegrationTest {
             producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
             producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
-            final KafkaProducer<String, String>
-                producer =
-                new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer());
+            try (final KafkaProducer<String, String> producer =
+                         new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
 
-            while (getCurrIteration() < numIterations && !shutdown) {
-                for (int i = 0; i < inputValues.size(); i++) {
-                    producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+                while (getCurrIteration() < numIterations && !shutdown) {
+                    for (int i = 0; i < inputValues.size(); i++) {
+                        producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+                    }
+                    incrementInteration();
                 }
-                incrementInteration();
             }
         }
     }