You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 王晨伊 <ch...@163.com> on 2018/04/10 18:37:00 UTC
[Structured Streaming] why events size is 0 when use
mapGroupsWithState
Hi.
In my opinion, the user-given function(updatePageviewDatawithState) of `mapGroupsWithState` is only called when the groups have values in the batch.
But in my program, why the group key present with its events size is 0?
My code is here:
https://gist.github.com/wangchenyi/f9922df2589eab3a59b6dc4a4369116f
def updatePageviewDatawithState(pvid: String,
events: Iterator[CLogData],
state: GroupState[PageViewInfo2]): (String, Int) = {
if (state.hasTimedOut) {
(pvid, state.get.numEvents)
} else {
val initInfo = PageViewInfo2(None, 0, 0L, 0L)
val pageViewInfo = PageViewInfo2(None, events.size + state.getOption.getOrElse(initInfo).numEvents, 0L, 0L)
state.update(pageViewInfo)
state.setTimeoutDuration("2 minutes")
(pvid, events.size)
}
}
val pageviewUpdates = clogs
.filter(_.isVaild)
.groupByKey(_.pvid.get)
.mapGroupsWithState[PageViewInfo2, (String, Int)](GroupStateTimeout.ProcessingTimeTimeout)(updatePageviewDatawithState)
val query = pageviewUpdates
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime(30 * 1000L))
.outputMode(OutputMode.Update())
.queryName("result")
.start()
query.awaitTermination()
case class PageViewInfo2(data: Option[PageViewData],
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long) {
def durationMs: Long = endTimestampMs - startTimestampMs
}
The result in console:
+--------------------+---+
| _1| _2|
+--------------------+---+
|15226024176171601...| 0|
|15226024256851260...| 0|
|15226024273681541...| 0|
|15226022679491609...| 0|
|15226024286451721...| 0|
|15226024172761117...| 0|
|15226024291321086...| 0|
|15226021352771095...| 0|
|15226023663011551...| 0|
|15226024286441507...| 0|
|15226024236061436...| 0|
|15226023383931282...| 0|
|15226023885631768...| 0|
|15226024213251590...| 0|
|15226024256271297...| 0|
|15226024134121502...| 0|
|15226021611911738...| 0|
|15226024238731395...| 0|
|15226024302221935...| 0|
|15226024272291491...| 0|
+--------------------+---+
-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+---+
| _1| _2|
+--------------------+---+
|15226023658521156...| 0|
|15226024028661926...| 1|
|15226024141331773...| 0|
|15226024286511673...| 0|
|15226024176171601...| 0|
|15226023714921889...| 1|
|15226024299991840...| 0|
|15226024172551278...| 2|
|15226024227151143...| 2|
|15226023843111675...| 5|
|15226023862341073...| 3|
|15226024110721221...| 19|
|15226024095771099...| 9|
|15226024168661388...| 5|
|15226024218091990...| 5|
|15226023939941831...| 0|
|15226024161701658...| 4|
|15226024195941917...| 2|
|15226023436441102...| 0|
|15226024290531396...| 0|
+--------------------+---+
-------------------------------------------
Batch: 4
-------------------------------------------
--
Chenyi Wang
tel:+8615910678909