You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ji ZHANG <zh...@gmail.com> on 2014/09/20 08:07:42 UTC

Spark Streaming: Calculate PV/UV by Minute and by Day?

Hi,

I'm using Spark Streaming 1.0.

Say I have a source of website click stream, like the following:

('2014-09-19 00:00:00', '192.168.1.1', 'home_page')
('2014-09-19 00:00:01', '192.168.1.2', 'list_page')
...

And I want to calculate the page views (PV, number of logs) and unique
user (UV, identified by IP) every minute, the result is like:

('2014-09-19 00:00:00', 'pv', 100)
('2014-09-19 00:00:00', 'uv', 50)
('2014-09-19 00:01:00', 'pv', 120)
('2014-09-19 00:01:00', 'uv', 60)

Also, the total pv/uv by minute, like:

('2014-09-19 00:00:00', 'total pv', 100)
('2014-09-19 00:00:00', 'total uv', 50)
('2014-09-19 00:01:00', 'total pv', 220) // 100 + 120
('2014-09-19 00:01:00', 'total uv', 80) // 50 unique users + 60 unique
users - duplicate ones

There are also some prerequisite:
* The stream may not be fluent, so at 12:00 you may still receiving
11:55's messages. Put it in another way, this program should support
both stream process and batch process, i.e. feed it with a whole day's
data, it'll output the same result as streaming one.
* The stream is partitioned, i.e. it may not be ordered. e.g. you may
receive 12:00:00, 12:00:05, 12:00:04, 12:00:06, but the time
difference shouldn't be too big.
* The final result will be written to mysql, schema is (created
datetime, category varchar(255), data bigint), just like the above
result.

For the 'every minute' one, I can use updateStateByKey, here's an
example of calculating pv: (batch duration is 2 secs)

val logsByMin = logs map { log =>
  val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:00").format(log.serverTime)
  date -> 1L
}

val numLogsByMin = logsByMin.updateStateByKey((values: Seq[Long],
state: Option[Long]) => {
  Some(state.getOrElse(0L) + values.sum)
})

numLogsByMin foreach { rdd =>
  savePv(rdd.collect)
}

This should meet the prerequisites, but with one major problem: the
outdated key is not evicted. So I come up with an idea of expirable
data - retain the calculated data for 2 minutes. Within the 2 minutes,
flush them into mysql after every batch. The code is here:

https://github.com/jizhang/spark-hello/blob/eb138e24b1e72e89bf3fa7e66c6ae7106853e5e8/src/main/scala/com/anjuke/dw/spark_hello/ActionLogProcessor.scala#L80

Maybe there' a better way to achieve this?

As for the total pv/uv, I can set the key to date (e.g. '2014-09-19'),
but how to save it to mysql every minute? Especially for uv, it cannot
be summed, so I need to save it every minute, but how?

Any ideas will be appreciated. Thanks.

Jerry

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org