You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ramkumar Venkataraman <ra...@gmail.com> on 2017/03/10 10:44:04 UTC

[Spark Streaming][Spark SQL] Design suggestions needed for sessionization

At high-level, I am looking to do sessionization. I want to combine events
based on some key, do some transformations and emit data to HDFS. The catch
is there are time boundaries, say, I group events in a window of 0.5 hours,
based on some timestamp key in the event. Typical event-time windowing +
key-based grouping stuff.

I have been trying to figure out ways to do it. 

The following best approaches are ruled-out:

1) Use event-time windows with watermarks and possibility of updating
previous windows on late data arrival. But this is possible in spark 2, but
it is only in alpha. Also the company I work for, doesnt support spark 2
yet. 
2) Use mapWithState in spark 1.6, but we can't do event-time windows if I am
not mistaken. 

Other feasible approaches:

3) Use another data-store like HBase to store unfinished sessions. Every
window needs to find out which session a particular event will fit in by
doing a query on HBase. Least favored option, since we have to maintain
another component operationally.
4) Use HDFS to store unfinished sessions. At every window, we need to create
a DF on the unfinished sessions, join it with the current DStream, do
transformations, emit a tuple stream of finished and unfinished sessions and
write them to HDFS. 

Option #4 looks elegant, but the catch is the write of unfinished sessions.
We read and write from the same HDFS location, the write also needs to do a
SaveMode.Overwrite. I am seeing concurrency problems when the next window of
read DF doesn't find the files in HDFS (because they are getting overwritten
by the write from previous window).

/Caused by: java.io.FileNotFoundException: File does not exist:
hdfs://horton/tmp/inprogress-events/part-r-00000-a22e7b14-3207-4fb3-8db8-f5423ef0441d.gz.parquet
	at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1374)/

So the questions I have:
1) Is option #4 of reading and writing DF to the same HDFS location (same
table basically) the right approach? If not, any alternatives?
2) I had fiddled with writing the DF at the end of a window to a temp
location and move the files from temp to the expected folder (using HDFS
utils) at the beginning of next window. Not much help.
3) Is there a way to make sure the write at the end of every window to block
the processing of next window? I tried to force creation of a new stage by
using coalesce, but not much help.
4) Is there any other totally different approach that is possible? We know
option #3 works, but we dont want to maintain any other component
operationally.

Let me know your thoughts or if need any more information. Any existing
pointers or SO answers on how people do sessionization in spark 1.6 would
also help (couldn't find anything that helped me)




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Spark-SQL-Design-suggestions-needed-for-sessionization-tp28480.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org