You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2023/05/23 11:07:00 UTC

[jira] [Commented] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

    [ https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725356#comment-17725356 ] 

Matthias J. Sax commented on KAFKA-14173:
-----------------------------------------

Just discovering this ticket.

I guess you would need to use `TestInputTopic#advanceTime` for this case?

Closing the ticket as "no an issue", as the API is there. Feel free to follow up.

> TopologyTestDriver does not use mock wall clock time when sending test records
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-14173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14173
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.3.1
>            Reporter: Guido Josquin
>            Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
>     {
>       case (billValue, null) => billValue
>       case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString
>     },
>     JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra record, never triggers the join condition when data only arrives on the left side. It is possible to circumvent this by passing an explicit event time with each test record. (See https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161) 
> However, the behavior deviates from a real Kafka broker. With a real broker, if we do not send an event, it uses the wall clock time of the broker instead. The behavior under test should be the same: `driver.advanceWallClockTime` should provide the default time to be used for `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)