You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lu Weizheng <lu...@hotmail.com> on 2020/08/08 08:18:24 UTC
How to write WatermarkStrategy in Scala?
Hi there,
Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala. I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully!
val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator(...)
)
class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
final private val maxOutOfOrderness = 60 * 1000
private var currentMaxTimestamp = 0L
override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
}
}
Re: How to write WatermarkStrategy in Scala?
Posted by Lu Weizheng <lu...@hotmail.com>.
Thank you Dawid,
I am using Scala 1.11. I come up with the same 1) solution as it may not be scala friendly. So I come here to ask question. Hope the new API may not change significantly.
Best Regards,
Weizheng
2020年8月10日 下午8:29,Dawid Wysakowicz <dw...@apache.org>> 写道:
Hi,
Regrettably I must admit the WatermarkStrategy is not very scala friendly :(
1) After a couple of tries what I'd recommend as the most reliable is to pass it through anonymous classes:
.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator[(String, Long)](
new WatermarkGeneratorSupplier[(String, Long)] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =
new MyPeriodicGenerator
}
)
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(t: (String, Long), l: Long): Long = t._2
})
)
2) With scala 2.12 you can try the automatic conversion of scala's lambdas to java's SAM, but unfortunately when I tried it, it failed for timestamp assigner with some problems in the serialization stack. I could not identify the root problem of it yet. Therefore I can not fully recommend it.
.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator[(String, Long)](
_ => new MyPeriodicGenerator
)
.withTimestampAssigner((e, _) => e._2)
)
I create a ticket to improve the situation here: https://issues.apache.org/jira/browse/FLINK-18873
Best,
Dawid
On 08/08/2020 10:18, Lu Weizheng wrote:
Hi there,
Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and watermark. I find there is no example in Scala. I have a (String, Long) Stream, can anyone help implement WatermarkStrategy? I will be really gratefully!
val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator(...)
)
class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
final private val maxOutOfOrderness = 60 * 1000
private var currentMaxTimestamp = 0L
override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
}
}
Re: How to write WatermarkStrategy in Scala?
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
Regrettably I must admit the WatermarkStrategy is not very scala friendly :(
1) After a couple of tries what I'd recommend as the most reliable is to
pass it through anonymous classes:
.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator[(String, Long)](
new WatermarkGeneratorSupplier[(String, Long)] {
override def createWatermarkGenerator(context:
WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Long)] =
new MyPeriodicGenerator
}
)
.withTimestampAssigner(new
SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(t: (String, Long), l: Long):
Long = t._2
})
)
2) With scala 2.12 you can try the automatic conversion of scala's
lambdas to java's SAM, but unfortunately when I tried it, it failed for
timestamp assigner with some problems in the serialization stack. I
could not identify the root problem of it yet. Therefore I can not fully
recommend it.
.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator[(String, Long)](
_ => new MyPeriodicGenerator
)
.withTimestampAssigner((e, _) => e._2)
)
I create a ticket to improve the situation here:
https://issues.apache.org/jira/browse/FLINK-18873
Best,
Dawid
On 08/08/2020 10:18, Lu Weizheng wrote:
> Hi there,
>
> Flink 1.11 comes with the new WatermarkStrategy API to assign
> timestamp and watermark. I find there is no example in Scala. I have
> a (String, Long) Stream, can anyone help implement WatermarkStrategy?
> I will be really gratefully!
>
> val input: DataStream[(String, Long)] = ...
> val watermark = input.assignTimestampsAndWatermarks(
> WatermarkStrategy.forGenerator(...)
> )
>
> class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
> final private val maxOutOfOrderness = 60 * 1000 private var currentMaxTimestamp = 0L override def onEvent(event: (String, Long), eventTimestamp: Long, output: WatermarkOutput): Unit = {
> currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
> }
>
> override def onPeriodicEmit(output: WatermarkOutput): Unit = {
> output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
> }
> }
>
>