You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2021/02/19 17:22:00 UTC
[jira] [Updated] (KAFKA-12346) punctuate is called at twice the
duration passed as the first argument to Processor.Schedule (with
PunctuationType.WALL_CLOCK_TIME)
[ https://issues.apache.org/jira/browse/KAFKA-12346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-12346:
------------------------------------
Description:
A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
{code:java}
.transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = {
val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
context.schedule(scanFrequency,
PunctuationType.WALL_CLOCK_TIME,
new Punctuator {
override def punctuate(timestamp: Long): Unit = {
logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
}
}
)
} override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
// no need to return anything here, the Punctuator will emit the records when necessary
null
} override def close(): Unit = {}
}
}, /**
* register that this Transformer needs to be connected to our state store.
*/
stateStoreName
)
{code}
was:
A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
{code:java}
.transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = {
val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
context.schedule(scanFrequency,
PunctuationType.WALL_CLOCK_TIME,
new Punctuator {
override def punctuate(timestamp: Long): Unit = {
logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
}
}
)
} override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
// no need to return anything here, the Punctuator will emit the records when necessary
null
} override def close(): Unit = {}
}
}, /**
* register that this Transformer needs to be connected to our state store.
*/
stateStoreName
)
{code}
> punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)
> -----------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-12346
> URL: https://issues.apache.org/jira/browse/KAFKA-12346
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.7.0
> Reporter: Arindam Ray
> Priority: Major
>
> A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed
> {code:java}
> .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() {
> override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = {
> val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]]
> context.schedule(scanFrequency,
> PunctuationType.WALL_CLOCK_TIME,
> new Punctuator {
> override def punctuate(timestamp: Long): Unit = {
> logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}")
> }
> }
> )
> } override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = {
> // no need to return anything here, the Punctuator will emit the records when necessary
> null
> } override def close(): Unit = {}
> }
> }, /**
> * register that this Transformer needs to be connected to our state store.
> */
> stateStoreName
> )
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)