You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Priya Ch <le...@gmail.com> on 2015/10/20 14:15:02 UTC

Concurrency issue in Streams of data

Hi All,

  When processing streams of data (with batch inter val 1 sec), there is
possible case of Concurrency issue. i.e two messages M1 and M2 (updated
version of M1) with same key are processed by 2 threads in parallel.

To resolve this concurrency issue, I am applying Hash Partitioner on RDD.
(i.e rdd.partitionBy(new HashPartitioner)). Using this M1 and M2 get
processed in single thread (single partition).

I encounter a different issue now. rdd is having M1 and M2 in single
partition. For every message, I do a look up on DB and apply the business
logic.
The expected result is since M2 is an updated version of M1, only few
fields need to be updated in DB.

The code looks like-
val rdd1 = rdd.map{message =>

val listOfRows =  CassandraConnector.withSession(// read from
DB).iterator.toList
val filteredList = listOfRows.filter( // business rules)
val newList = filteredList.map(details => (details._1, details._2, true))
newList
}

val rdd2 = rdd1.flatMap(details => details).saveToCassandra("keySpace",
"table_name", SomeColumns(col1,col2,col3))

In the above case, since write is performed at one shot, the map
transformation applies business rules applicable for message M1 to M2
(treating it as initial version). This shouldn't happen.

Ony way i Could think of is use rdd.foreach and write individual statement
reads and writes instead of using save ToCassandra.

Any ideas to resolve the same ???

Thanks in aticipation,
Padma Ch