You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/16 00:48:02 UTC

[kafka] branch 2.1 updated: MINOR: fix non-deterministic streams-scala tests (#5792)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0d6a3d8  MINOR: fix non-deterministic streams-scala tests (#5792)
0d6a3d8 is described below

commit 0d6a3d87bfaf4f9cc1ac902c7ab4adb97fa82451
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Oct 15 19:47:20 2018 -0500

    MINOR: fix non-deterministic streams-scala tests (#5792)
    
    Stop using current system time by default, as it introduces non-determinism.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../org/apache/kafka/streams/scala/kstream/KStreamTest.scala  | 11 +++++++----
 .../org/apache/kafka/streams/scala/utils/TestDriver.scala     |  3 ++-
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 8626be5..c317de0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -102,7 +102,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     val sinkTopic = "sink"
 
     var acc = ""
-    builder.stream[String, String](sourceTopic).peek((k, v) => acc += v).to(sinkTopic)
+    builder.stream[String, String](sourceTopic).peek((_, v) => acc += v).to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
 
@@ -147,10 +147,13 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
     val stream2 = builder.stream[String, String](sourceTopic2)
     stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1))).to(sinkTopic)
 
-    val testDriver = createTestDriver(builder)
+    val now = System.currentTimeMillis()
+
+    val testDriver = createTestDriver(builder, now)
+
+    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"), now)
+    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"), now)
 
-    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
-    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
     testDriver.readRecord[String, String](sinkTopic).value shouldBe "topic1value1-topic2value1"
 
     testDriver.readRecord[String, String](sinkTopic) shouldBe null
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
index 1497dd7..17b2570 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -29,7 +29,8 @@ import org.scalatest.Suite
 
 trait TestDriver { this: Suite =>
 
-  def createTestDriver(builder: StreamsBuilder, initialWallClockTimeMs: Long = System.currentTimeMillis()) = {
+  def createTestDriver(builder: StreamsBuilder,
+                       initialWallClockTimeMs: Long = System.currentTimeMillis()): TopologyTestDriver = {
     val config = new Properties()
     config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
     config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")