You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/18 10:28:55 UTC
kafka git commit: KAFKA-4222;
QueryableIntegrationTest.queryOnRebalance transient failure
Repository: kafka
Updated Branches:
refs/heads/trunk 14a3d69d9 -> 816578b5c
KAFKA-4222; QueryableIntegrationTest.queryOnRebalance transient failure
Don't produce messages on a separate thread continuosly. Just produce one of each value and stop.
Close the producer once finished.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3080 from dguy/qs-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/816578b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/816578b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/816578b5
Branch: refs/heads/trunk
Commit: 816578b5c13d36d73af02d1c11787f789a69be3f
Parents: 14a3d69
Author: Damian Guy <da...@gmail.com>
Authored: Thu May 18 11:28:53 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 18 11:28:53 2017 +0100
----------------------------------------------------------------------
.../QueryableStateIntegrationTest.java | 27 ++++++++------------
1 file changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/816578b5/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 4b5ae17..f2d0427 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -329,19 +329,17 @@ public class QueryableStateIntegrationTest {
final int numThreads = STREAM_TWO_PARTITIONS;
final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
final Thread[] streamThreads = new Thread[numThreads];
- final int numIterations = 500000;
- // create concurrent producer
- final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, numIterations);
- final Thread producerThread = new Thread(producerRunnable);
+ final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
+ producerRunnable.run();
- // create three stream threads
+
+ // create stream threads
for (int i = 0; i < numThreads; i++) {
streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, i);
streamThreads[i] = new Thread(streamRunnables[i]);
streamThreads[i].start();
}
- producerThread.start();
try {
waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
@@ -375,9 +373,6 @@ public class QueryableStateIntegrationTest {
streamThreads[i].join();
}
}
- producerRunnable.shutdown();
- producerThread.interrupt();
- producerThread.join();
}
}
@@ -913,15 +908,15 @@ public class QueryableStateIntegrationTest {
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- final KafkaProducer<String, String>
- producer =
- new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer());
+ try (final KafkaProducer<String, String> producer =
+ new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
- while (getCurrIteration() < numIterations && !shutdown) {
- for (int i = 0; i < inputValues.size(); i++) {
- producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+ while (getCurrIteration() < numIterations && !shutdown) {
+ for (int i = 0; i < inputValues.size(); i++) {
+ producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+ }
+ incrementInteration();
}
- incrementInteration();
}
}
}