You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sanjay Subramanian <sa...@yahoo.com.INVALID> on 2014/12/31 09:12:25 UTC
FlatMapValues
hey guys
My dataset is like this
025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10
Intended output is ==================025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia
My code is as follows but the flatMapValues does not work even after I have created the pair RDD.************************************************************************reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
""
}
}).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)************************************************************************
thanks
sanjay
Re: FlatMapValues
Posted by Raghavendra Pandey <ra...@gmail.com>.
Why don't you push "\n" instead of "\t" in your first transformation [
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"
+fields(9)))] and then do saveAsTextFile?
-Raghavendra
On Wed Dec 31 2014 at 1:42:55 PM Sanjay Subramanian
<sa...@yahoo.com.invalid> wrote:
> hey guys
>
> My dataset is like this
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
> Intended output is
> ==================
> 025126,Chills
> 025126,Injection site oedema
> 025126,Injection site reaction
> 025126,Malaise
> 025126,Myalgia
>
> My code is as follows but the flatMapValues does not work even after I have created the pair RDD.
>
> ************************************************************************
>
> reacRdd.map(line => line.split(',')).map(fields => {
> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> }
> else {
> ""
> }
> }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
> ************************************************************************
>
>
> thanks
>
> sanjay
>
Re: FlatMapValues
Posted by Sanjay Subramanian <sa...@yahoo.com.INVALID>.
Hey Kapil, Fernando
Thanks for your mail.
[1] Fernando, if I don't use an "if" logic inside the "map" then if I have lines of input data that have less fields than I am expecting I get ArrayOutOfBounds exception. so the "if" is to safeguard against that.
[2] Kapil, I am sorry I did not clarify. Yes my code "DID NOT" compile saying that flatMapValues is not defined.
In fact when I used your snippet , the code still does not compile
Error:(36, 57) value flatMapValues is not a member of org.apache.spark.rdd.RDD[(String, String)] }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) ^
My pom.xml looks like this
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.2.0</version>
</dependency>
[3] To summarize all I want is to convert
SUMMARY=======when a dataset looks like the following
1,red,blue,green2,yellow,violet,pink
I want to output the following and currently not able to
1,red1,blue1,green2,yellow2,violet2,pink
thanks
regards
sanjay
From: Fernando O. <fo...@gmail.com>
To: Kapil Malik <km...@adobe.com>
Cc: Sanjay Subramanian <sa...@yahoo.com>; "user@spark.apache.org" <us...@spark.apache.org>
Sent: Wednesday, December 31, 2014 6:06 AM
Subject: Re: FlatMapValues
Hi Sanjay,
Doing an if inside a Map sounds like a bad idea, it seems like you actually want to filter and then apply map
On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik <km...@adobe.com> wrote:
Hi Sanjay, I tried running your code on spark shell piece by piece – // Setupval line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”val lines = Array[String](line1, line2) val r1 = sc.parallelize(lines, 2)// r1 is the original RDD[String] to begin with val r2 = r1.map(line => line.split(','))// RDD[Array[String]] – so far, so goodval r3 = r2.map(fields => { if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) { (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))// Returns a pair (String, String), good } else { ""// Returns a String, bad } })// RDD[Serializable] – PROBLEM I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly. The following changes in your snippet make it work as intended - reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
("","")
}
}).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile) Please note that this too saves lines like (025126,Chills),i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String. Kapil From: Sanjay Subramanian [mailto:sanjaysubramanian@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues hey guys My dataset is like this 025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10 Intended output is ==================025126,Chills025126,Injection site oedema025126,Injection site reaction025126,Malaise025126,Myalgia My code is as follows but the flatMapValues does not work even after I have created the pair RDD.************************************************************************reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
""
}
}).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)************************************************************************ thanks sanjay
Re: FlatMapValues
Posted by "Fernando O." <fo...@gmail.com>.
Hi Sanjay,
Doing an if inside a Map sounds like a bad idea, it seems like you actually
want to filter and then apply map
On Wed, Dec 31, 2014 at 9:54 AM, Kapil Malik <km...@adobe.com> wrote:
> Hi Sanjay,
>
>
>
> I tried running your code on spark shell piece by piece –
>
>
>
> // Setup
>
> val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10”
>
> val lines = Array[String](line1, line2)
>
>
>
> val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to
> begin with
>
>
>
> val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far,
> so good
>
> val r3 = r2.map(fields => {
>
> if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
> // Returns a pair (String, String), good
>
> }
>
> else {
>
> "" // Returns a String, bad
>
> }
>
> }) // RDD[Serializable] – PROBLEM
>
>
>
> I was not even able to apply flatMapValues since the filtered RDD passed
> to it is RDD[Serializable] and not a pair RDD. I am surprised how your code
> compiled correctly.
>
>
>
>
>
> The following changes in your snippet make it work as intended -
>
>
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
> *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
> }
> *else *{
> (*"","")*
> }
> }).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
>
>
> Please note that this too saves lines like (025126,Chills), i.e. with
> opening and closing brackets ( and ). If you want to get rid of them,
> better do another map operation to map pair to String.
>
>
>
> Kapil
>
>
>
> *From:* Sanjay Subramanian [mailto:sanjaysubramanian@yahoo.com.INVALID]
> *Sent:* 31 December 2014 13:42
> *Cc:* user@spark.apache.org
> *Subject:* FlatMapValues
>
>
>
> hey guys
>
>
>
> My dataset is like this
>
>
>
> 025126,Chills,8.10,Injection site oedema,8.10,Injection site
> reaction,8.10,Malaise,8.10,Myalgia,8.10
>
>
>
> Intended output is
>
> ==================
>
> 025126,Chills
>
> 025126,Injection site oedema
>
> 025126,Injection site reaction
>
> 025126,Malaise
>
> 025126,Myalgia
>
>
>
> My code is as follows but the flatMapValues does not work even after I have created the pair RDD.
>
> ************************************************************************
>
> reacRdd.map(line => line.split(*','*)).map(fields => {
> *if *(fields.length >= 11 && !fields(0).contains(*"VAERS_ID"*)) {
> (fields(0),(fields(1)+*"**\t**"*+fields(3)+*"**\t**"*+fields(5)+*"**\t**"*+fields(7)+*"**\t**"*+fields(9)))
> }
> *else *{
>
> *"" *}
> }).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split(*'**\t**'*)).saveAsTextFile(*"/data/vaers/msfx/reac/" *+ outFile)
>
> ************************************************************************
>
>
>
> thanks
>
>
>
> sanjay
>
RE: FlatMapValues
Posted by Kapil Malik <km...@adobe.com>.
Hi Sanjay,
I tried running your code on spark shell piece by piece –
// Setup
val line1 = “025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”
val line2 = “025127,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10”
val lines = Array[String](line1, line2)
val r1 = sc.parallelize(lines, 2) // r1 is the original RDD[String] to begin with
val r2 = r1.map(line => line.split(',')) // RDD[Array[String]] – so far, so good
val r3 = r2.map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9))) // Returns a pair (String, String), good
}
else {
"" // Returns a String, bad
}
}) // RDD[Serializable] – PROBLEM
I was not even able to apply flatMapValues since the filtered RDD passed to it is RDD[Serializable] and not a pair RDD. I am surprised how your code compiled correctly.
The following changes in your snippet make it work as intended -
reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
("","")
}
}).filter(pair => pair._1.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
Please note that this too saves lines like (025126,Chills), i.e. with opening and closing brackets ( and ). If you want to get rid of them, better do another map operation to map pair to String.
Kapil
From: Sanjay Subramanian [mailto:sanjaysubramanian@yahoo.com.INVALID]
Sent: 31 December 2014 13:42
Cc: user@spark.apache.org
Subject: FlatMapValues
hey guys
My dataset is like this
025126,Chills,8.10,Injection site oedema,8.10,Injection site reaction,8.10,Malaise,8.10,Myalgia,8.10
Intended output is
==================
025126,Chills
025126,Injection site oedema
025126,Injection site reaction
025126,Malaise
025126,Myalgia
My code is as follows but the flatMapValues does not work even after I have created the pair RDD.
************************************************************************
reacRdd.map(line => line.split(',')).map(fields => {
if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
(fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
}
else {
""
}
}).filter(line => line.toString.length() > 0).flatMapValues(skus => skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
************************************************************************
thanks
sanjay