You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/04 01:40:20 UTC

[kafka] branch trunk updated: KAFKA-9798: Send one round synchronously before starting the async producer (#8565)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34824b7  KAFKA-9798: Send one round synchronously before starting the async producer (#8565)
34824b7 is described below

commit 34824b7bff64ba387a04466d74ac6bbbd10bf37c
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun May 3 18:39:43 2020 -0700

    KAFKA-9798: Send one round synchronously before starting the async producer (#8565)
    
    Comparing all other test cases, the shouldAllowConcurrentAccesses starts an async producer sending records throughout the test other than just synchronously sent and acked a few records before we start the streams application. Right after the streams app is started, we check that at least one record is sent to the output topic (i.e. completed processing). However since only this test starts the producer async and did not wait for it to complete, it is possible that the async producer  [...]
    
    To follow what other tests did, I let this test to first send one round of records synchronously before starting the async producing.
    
    Also encountered some new scala warnings that I fixed along with this PR.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../scala/unit/kafka/server/ServerGenerateClusterIdTest.scala     | 8 +++++---
 .../kafka/streams/integration/QueryableStateIntegrationTest.java  | 8 ++++++--
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 66d95fc..acf8eeb 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -18,19 +18,21 @@ package kafka.server
 
 import java.io.File
 
-import kafka.common.{InconsistentBrokerMetadataException, InconsistentClusterIdException}
 
+import scala.collection.Seq
 import scala.concurrent._
-import ExecutionContext.Implicits._
 import scala.concurrent.duration._
+import ExecutionContext.Implicits._
+
+import kafka.common.{InconsistentBrokerMetadataException, InconsistentClusterIdException}
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
+
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.assertThrows
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
-import scala.collection.Seq
 
 class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
   var config1: KafkaConfig = null
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index eb33107..fcbc3de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -203,11 +203,11 @@ public class QueryableStateIntegrationTest {
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);
         stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) -> o.key).thenComparingLong(o -> o.value);
@@ -628,7 +628,11 @@ public class QueryableStateIntegrationTest {
         final String storeName = "word-count-store";
         final String windowStoreName = "windowed-word-count-store";
 
-        final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
+        // send one round of records first to populate the stores
+        ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
+        producerRunnable.run();
+
+        producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations - 1);
         final Thread producerThread = new Thread(producerRunnable);
         kafkaStreams = createCountStream(
             streamConcurrent,