You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/16 00:48:02 UTC
[kafka] branch 2.1 updated: MINOR: fix non-deterministic
streams-scala tests (#5792)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 0d6a3d8 MINOR: fix non-deterministic streams-scala tests (#5792)
0d6a3d8 is described below
commit 0d6a3d87bfaf4f9cc1ac902c7ab4adb97fa82451
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Oct 15 19:47:20 2018 -0500
MINOR: fix non-deterministic streams-scala tests (#5792)
Stop using current system time by default, as it introduces non-determinism.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/streams/scala/kstream/KStreamTest.scala | 11 +++++++----
.../org/apache/kafka/streams/scala/utils/TestDriver.scala | 3 ++-
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 8626be5..c317de0 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -102,7 +102,7 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val sinkTopic = "sink"
var acc = ""
- builder.stream[String, String](sourceTopic).peek((k, v) => acc += v).to(sinkTopic)
+ builder.stream[String, String](sourceTopic).peek((_, v) => acc += v).to(sinkTopic)
val testDriver = createTestDriver(builder)
@@ -147,10 +147,13 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
val stream2 = builder.stream[String, String](sourceTopic2)
stream1.join(stream2)((a, b) => s"$a-$b", JoinWindows.of(ofSeconds(1))).to(sinkTopic)
- val testDriver = createTestDriver(builder)
+ val now = System.currentTimeMillis()
+
+ val testDriver = createTestDriver(builder, now)
+
+ testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"), now)
+ testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"), now)
- testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
- testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
testDriver.readRecord[String, String](sinkTopic).value shouldBe "topic1value1-topic2value1"
testDriver.readRecord[String, String](sinkTopic) shouldBe null
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
index 1497dd7..17b2570 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
@@ -29,7 +29,8 @@ import org.scalatest.Suite
trait TestDriver { this: Suite =>
- def createTestDriver(builder: StreamsBuilder, initialWallClockTimeMs: Long = System.currentTimeMillis()) = {
+ def createTestDriver(builder: StreamsBuilder,
+ initialWallClockTimeMs: Long = System.currentTimeMillis()): TopologyTestDriver = {
val config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")