You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2016/08/22 10:05:20 UTC
[jira] [Commented] (FLINK-4428) Method map/flatMapWithState may
need a eviction policy
[ https://issues.apache.org/jira/browse/FLINK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430496#comment-15430496 ]
Aljoscha Krettek commented on FLINK-4428:
-----------------------------------------
Hi,
yes that's a good observation. We were also thinking for a while now that we need to add a way to specify a TTL (time to live) for regular state.
The thinking for that was to add a TTL setting to the {{StateDescriptor}}. This, however, will require timer functionality in the state backend, which would be nice to have anyways.
> Method map/flatMapWithState may need a eviction policy
> ------------------------------------------------------
>
> Key: FLINK-4428
> URL: https://issues.apache.org/jira/browse/FLINK-4428
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API
> Affects Versions: 1.1.2
> Reporter: Renkai Ge
>
> I want to count the number of unique visitors of a website every day.
> If the number changes, I want get the newest number in 1 second, and
> it should keep silence if the number doesn't change.I implemented this
> by time window of 1 day,trigger of 1 second and flatMapWithState to
> filter duplicated results.
> {code}
> // case class Visit(uuid: String, time: Long, platform: Int)
>
> // case class WindowUv(platform: Int, uv: Long, windowStart: Long, WindowEnd: Long)
>
> // val consumer: FlinkKafkaConsumer08[Visit]
> val stream =
> env.addSource(consumer)
> .keyBy(_.platform)
> .timeWindow(Time.days(1))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
> .applyWith((0, Set.empty[Int], 0l, 0l))(
> foldFunction = {
> case ((_, set, _, 0), visit) =>
> (visit.platform, set + visit.uuid.hashCode, 0, 0)
> },
> windowFunction = {
> case (key, window, results) =>
> results.map {
> case (platform, set, _, _) =>
> (platform, set, window.getStart, window.getEnd)
> }
> }
> )
> .mapWith {
> case (key, set, windowStart, windowEnd) =>
> WindowUv(key, set.size, windowStart, windowEnd)
> }
> .keyBy(uv => (uv.platform, uv.windowStart))
> .flatMapWithState[WindowUv, Int] {
> case ((key, num, begin, end), curr) =>
> curr match {
> case Some(numCurr) if numCurr == num =>
> (Seq.empty, Some(num))
> case _ =>
> (Seq(WindowUv(key, num, begin, end)), Some(num))
> }
> }
> stream.print()
> env.execute("Boom")
> {code}
> There is a problem that I used flatMapWithState,the state of one day will
> be never updated and never used after the day passed, but it will stay
> in the memory forever, there is no way to evict it. So I think the status
> in map may need some eviction policy related with time or global conditions
> rather than only with the last message of the key(It's hard to tell whether
> a message is the last when the last is coming).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)