You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 王晨伊 <ch...@163.com> on 2018/04/10 18:37:00 UTC

[Structured Streaming] why events size is 0 when use mapGroupsWithState

Hi.
 In my opinion, the user-given function(updatePageviewDatawithState) of `mapGroupsWithState` is only called when the groups have values in the batch.
 But in my program, why the group key present with its events size is 0? 


My code is here:
https://gist.github.com/wangchenyi/f9922df2589eab3a59b6dc4a4369116f
def updatePageviewDatawithState(pvid: String,
events: Iterator[CLogData],
state: GroupState[PageViewInfo2]): (String, Int) = {
if (state.hasTimedOut) {
    (pvid, state.get.numEvents)
  } else {
val initInfo = PageViewInfo2(None, 0, 0L, 0L)
val pageViewInfo = PageViewInfo2(None, events.size + state.getOption.getOrElse(initInfo).numEvents, 0L, 0L)
    state.update(pageViewInfo)
    state.setTimeoutDuration("2 minutes")

    (pvid, events.size)
  }
}

val pageviewUpdates = clogs
  .filter(_.isVaild)
  .groupByKey(_.pvid.get)
  .mapGroupsWithState[PageViewInfo2, (String, Int)](GroupStateTimeout.ProcessingTimeTimeout)(updatePageviewDatawithState)

val query = pageviewUpdates
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime(30 * 1000L))
  .outputMode(OutputMode.Update())
  .queryName("result")
  .start()

query.awaitTermination()
case class PageViewInfo2(data: Option[PageViewData],
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long) {
def durationMs: Long = endTimestampMs - startTimestampMs
}


The result in console:


+--------------------+---+
|                  _1| _2|
+--------------------+---+
|15226024176171601...|  0|
|15226024256851260...|  0|
|15226024273681541...|  0|
|15226022679491609...|  0|
|15226024286451721...|  0|
|15226024172761117...|  0|
|15226024291321086...|  0|
|15226021352771095...|  0|
|15226023663011551...|  0|
|15226024286441507...|  0|
|15226024236061436...|  0|
|15226023383931282...|  0|
|15226023885631768...|  0|
|15226024213251590...|  0|
|15226024256271297...|  0|
|15226024134121502...|  0|
|15226021611911738...|  0|
|15226024238731395...|  0|
|15226024302221935...|  0|
|15226024272291491...|  0|
+--------------------+---+
-------------------------------------------
Batch: 3
-------------------------------------------


+--------------------+---+
|                  _1| _2|
+--------------------+---+
|15226023658521156...|  0|
|15226024028661926...|  1|
|15226024141331773...|  0|
|15226024286511673...|  0|
|15226024176171601...|  0|
|15226023714921889...|  1|
|15226024299991840...|  0|
|15226024172551278...|  2|
|15226024227151143...|  2|
|15226023843111675...|  5|
|15226023862341073...|  3|
|15226024110721221...| 19|
|15226024095771099...|  9|
|15226024168661388...|  5|
|15226024218091990...|  5|
|15226023939941831...|  0|
|15226024161701658...|  4|
|15226024195941917...|  2|
|15226023436441102...|  0|
|15226024290531396...|  0|
+--------------------+---+


-------------------------------------------
Batch: 4
-------------------------------------------



--
Chenyi Wang

tel:+8615910678909