You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/02/28 21:33:45 UTC

[jira] [Assigned] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

     [ https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang reassigned KAFKA-4789:
------------------------------------

    Assignee: Hamidreza Afzali

> ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4789
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4789
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Hamidreza Afzali
>            Assignee: Hamidreza Afzali
>              Labels: unit-test
>             Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
>     val inputTopic = "input"
>     val outputTopic = "output"
>     val stateStore = "count"
>     val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", 2))
>     val props = new Properties
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
>     props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[MyTimestampExtractor].getName)
>     val windowedStringSerde = Serdes.serdeFrom(new WindowedSerializer(Serdes.String.serializer),
>       new WindowedDeserializer(Serdes.String.deserializer))
>     val builder = new KStreamBuilder
>     builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>       .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>       .groupByKey(Serdes.String, Serdes.Integer)
>       .count(TimeWindows.of(1000L), stateStore)
>       .to(windowedStringSerde, Serdes.Long, outputTopic)
>     val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore)
>     inputs.foreach {
>       case (key, value) => {
>         driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer)
>         val record = driver.readOutput(outputTopic, Serdes.String.deserializer, Serdes.Long.deserializer)
>         println(record)
>       }
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)