You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohit Jaggi <mo...@gmail.com> on 2014/04/29 23:44:44 UTC
rdd ordering gets scrambled
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it
into Scala objects using map operation in Scala. Then I used more maps to
add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order
from the original input intact?
My code looks like:
csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line => line.split(",",-1) }
parsedRdd = rdd map { parts =>
{
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)....) //parse into scala objects
(key, value)
}
augmentedRdd = parsedRdd map { x =>
key = x._1
value = //add extra fields to x._2
(key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted
Mohit.
Re: rdd ordering gets scrambled
Posted by Michael Malak <mi...@yahoo.com>.
Mohit Jaggi:
A workaround is to use zipWithIndex (to appear in Spark 1.0, but if you're still on 0.9x you can swipe the code from https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala ), map it to (x => (x._2,x._1)) and then sortByKey.
Spark developers:
The lack of ordering guarantee for RDDs should be better documented, and the presence of a method called first() is a bit deceiving, in my opinion, if that same "first" element doesn't survive a map().
On Tuesday, April 29, 2014 3:45 PM, Mohit Jaggi <mo...@gmail.com> wrote:
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it into Scala objects using map operation in Scala. Then I used more maps to add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order from the original input intact?
My code looks like:
csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line => line.split(",",-1) }
parsedRdd = rdd map { parts =>
{
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)....) //parse into scala objects
(key, value)
}
augmentedRdd = parsedRdd map { x =>
key = x._1
value = //add extra fields to x._2
(key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted
Mohit.