You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/02/19 17:22:00 UTC

[jira] [Updated] (KAFKA-12346) punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)

     [ https://issues.apache.org/jira/browse/KAFKA-12346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax updated KAFKA-12346:
------------------------------------
    Description: 
A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
{code:java}
        .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
          override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] {            override def init(context: ProcessorContext): Unit = {
              val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
              context.schedule(scanFrequency,
                PunctuationType.WALL_CLOCK_TIME,
                new Punctuator {
                  override def punctuate(timestamp: Long): Unit = {
                    logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
                  }
                }
              )
            }            override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
              // no need to return anything here, the Punctuator will emit the records when necessary
              null
            }            override def close(): Unit = {}
          }
        },          /**
           * register that this Transformer needs to be connected to our state store.
           */
          stateStoreName
        )


{code}

  was:
A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
{code:java}
        .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
          override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] {            override def init(context: ProcessorContext): Unit = {
              val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
              context.schedule(scanFrequency,
                PunctuationType.WALL_CLOCK_TIME,
                new Punctuator {
                  override def punctuate(timestamp: Long): Unit = {
                    logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
                  }
                }
              )
            }            override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
              // no need to return anything here, the Punctuator will emit the records when necessary
              null
            }            override def close(): Unit = {}
          }
        },          /**
           * register that this Transformer needs to be connected to our state store.
           */
          stateStoreName
        )
{code}


> punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12346
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12346
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Arindam Ray
>            Priority: Major
>
> A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
> {code:java}
>         .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
>           override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] {            override def init(context: ProcessorContext): Unit = {
>               val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
>               context.schedule(scanFrequency,
>                 PunctuationType.WALL_CLOCK_TIME,
>                 new Punctuator {
>                   override def punctuate(timestamp: Long): Unit = {
>                     logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
>                   }
>                 }
>               )
>             }            override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
>               // no need to return anything here, the Punctuator will emit the records when necessary
>               null
>             }            override def close(): Unit = {}
>           }
>         },          /**
>            * register that this Transformer needs to be connected to our state store.
>            */
>           stateStoreName
>         )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)