You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2017/02/28 21:33:45 UTC
[jira] [Assigned] (KAFKA-4789) ProcessorTopologyTestDriver does not
forward extracted timestamps to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang reassigned KAFKA-4789:
------------------------------------
Assignee: Hamidreza Afzali
> ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Hamidreza Afzali
> Assignee: Hamidreza Afzali
> Labels: unit-test
> Fix For: 0.10.3.0
>
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
> def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@1450000000", 1), ("B@1450000000", 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new WindowedSerializer(Serdes.String.serializer),
> new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
> .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
> .groupByKey(Serdes.String, Serdes.Integer)
> .count(TimeWindows.of(1000L), stateStore)
> .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStore)
> inputs.foreach {
> case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)