You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2016/08/22 10:05:20 UTC

[jira] [Commented] (FLINK-4428) Method map/flatMapWithState may need a eviction policy

    [ https://issues.apache.org/jira/browse/FLINK-4428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430496#comment-15430496 ] 

Aljoscha Krettek commented on FLINK-4428:
-----------------------------------------

Hi,
yes that's a good observation. We were also thinking for a while now that we need to add a way to specify a TTL (time to live) for regular state.

The thinking for that was to add a TTL setting to the {{StateDescriptor}}. This, however, will require timer functionality in the state backend, which would be nice to have anyways.

> Method map/flatMapWithState may need a eviction policy
> ------------------------------------------------------
>
>                 Key: FLINK-4428
>                 URL: https://issues.apache.org/jira/browse/FLINK-4428
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>    Affects Versions: 1.1.2
>            Reporter: Renkai Ge
>
> I want to count the number of unique visitors of a website every day.
>  If the number changes, I want get the newest number in 1 second, and
>  it should keep silence if the number doesn't change.I implemented this 
>  by time window of 1 day,trigger of 1 second and flatMapWithState to 
>  filter duplicated results. 
> {code}
>      //    case class Visit(uuid: String, time: Long, platform: Int)
>  
>      //    case class WindowUv(platform: Int, uv: Long, windowStart: Long, WindowEnd: Long)
>  
>      //  val consumer: FlinkKafkaConsumer08[Visit]
>      val stream =
>      env.addSource(consumer)
>        .keyBy(_.platform)
>        .timeWindow(Time.days(1))
>        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>        .applyWith((0, Set.empty[Int], 0l, 0l))(
>          foldFunction = {
>            case ((_, set, _, 0), visit) =>
>              (visit.platform, set + visit.uuid.hashCode, 0, 0)
>          },
>          windowFunction = {
>            case (key, window, results) =>
>              results.map {
>                case (platform, set, _, _) =>
>                  (platform, set, window.getStart, window.getEnd)
>              }
>          }
>        )
>        .mapWith {
>          case (key, set, windowStart, windowEnd) =>
>            WindowUv(key, set.size, windowStart, windowEnd)
>        }
>        .keyBy(uv => (uv.platform, uv.windowStart))
>        .flatMapWithState[WindowUv, Int] {
>        case ((key, num, begin, end), curr) =>
>          curr match {
>            case Some(numCurr) if numCurr == num =>
>              (Seq.empty, Some(num))
>            case _ =>
>              (Seq(WindowUv(key, num, begin, end)), Some(num))
>          }
>      }
>      stream.print()
>      env.execute("Boom")
> {code}
> There is a problem that I used flatMapWithState,the state of one day will
> be never updated and never used after the day passed, but it will stay
> in the memory forever, there is no way to evict it. So I think the status
> in map may need some eviction policy related with time or global conditions
> rather than only with the last message of the key(It's hard to tell whether 
> a message is the last when the last is coming).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)