You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aakash Basu <aa...@gmail.com> on 2017/02/17 09:37:03 UTC
How to convert RDD to DF for this case -
Hi all,
Without using case class I tried making a DF to work on the join and other
filtration later. But I'm getting an ArrayIndexOutOfBoundException error
while doing a show of the DF.
1) Importing SQLContext=
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.SQLContext
2) Initializing SQLContext=
val sqlContext = new SQLContext(sc)
3) Importing implicits package for toDF conversion=
import sqlContext.implicits._
4) Reading the Station and Storm Files=
val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
stat.foreach(println)
uihgf Paris 56 5
asfsds *** 43 1
fkwsdf London 45 6
gddg ABCD 32 2
grgzg *CSD 35 3
gsrsn ADR* 22 4
5) Creating row by segregating columns after reading the tab delimited file
before converting into DF=
*val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2),x.split("\t")(3)))*
6) Converting into DF=
val station = stati.toDF()
*station.show* is giving the below error ->
17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
15)
java.lang.ArrayIndexOutOfBoundsException: 1
Please help!
Thanks,
Aakash.
Re: How to convert RDD to DF for this case -
Posted by "颜发才 (Yan Facai)" <fa...@gmail.com>.
Hi, Basu,
if all columns is separated by delimter "\t", csv parser might be a better
choice.
for example:
```scala
spark.read
.option("sep", "\t")
.option("header", fasle)
.option("inferSchema", true)
.csv("/user/root/spark_demo/scala/data/Stations.txt")
```
More details in [DataFrameReader API](
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
)
Then we get two DataFrame,
you can register them and use sql to join.
On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu <aa...@gmail.com>
wrote:
> Hey Chris,
>
> Thanks for your quick help. Actually the dataset had issues, otherwise the
> logic I implemented was not wrong.
>
> I did this -
>
> 1) *V.Imp *– Creating row by segregating columns after reading the
> tab delimited file before converting into DF=
>
> val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2).toInt, x.split("\t")(3).toInt))
>
>
>
> Do a take to see if it throws an error or not (this step is just for
> ensuring if everything is going fine (as it is a lazy execution, that’s
> why)=
>
> stati.take(2)
>
> *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
> (asfsds,***,43,1))
>
> If this comes out, it means it is working fine. We can proceed.
>
> 2) *V.Imp* - Now converting into DF=
>
> val station = stati.toDF("StationKey","StationName","Temparature","Station
> ID")
>
>
>
> Now doing a show to see how it looks like=
>
> station.show
>
> *Ans:*
>
> * +----------+-----------+-----------+---------+*
>
> *|StationKey|StationName|Temparature|StationID|*
>
> *+----------+-----------+-----------+---------+*
>
> *| uihgf| Pune| 56| 5|*
>
> *| asfsds| ***| 43| 1|*
>
> *| fkwsdf| Mumbai| 45| 6|*
>
> *| gddg| ABCD| 32| 2|*
>
> *| grgzg| *CSD**| 35| 3|*
>
> *| gsrsn| Howrah| 22| 4|*
>
> *| ffafv| ***| 34| 7|*
>
> *+----------+-----------+-----------+---------+*
>
>
>
> 3) Do the same for the other dataset -
>
> i) val storr = stor.map(p => (p.split("\t")(0).toInt,
> p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
>
> ii) storr.take(2)
>
> iii) val storm = storr.toDF("ID","Name","Temp","Code")
>
> iv) storm.show
>
>
>
>
>
> 4) Registering as table=
>
> val stations2 = station.registerTempTable("Stations")
>
> val storms2 = storm.registerTempTable("Storms")
>
>
>
> 5) Querying on the joinedDF as per requirements=
>
> val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
> Stations.StationID as StationID from Stations inner join Storms on
> Storms.Code = Stations.StationKey where Stations.Temparature > 35")
>
>
>
> 6) joinedDF.show
>
> +-----------+---------+
>
> |StationName|StationID|
>
> +-----------+---------+
>
> | Pune| 5|
>
> +-----------+---------+
>
> 7) Saving the file as CSV=
>
> joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile
> ("/user/root/spark_demo/scala/data/output/Question6Soln")
>
>
>
> Thanks,
>
> Aakash.
>
> On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
> christophe.preaud@kelkoo.com> wrote:
>
>> Hi Aakash,
>>
>> You can try this:
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>>
>> val header = Array("col1", "col2", "col3", "col4")
>> val schema = StructType(header.map(StructField(_, StringType, true)))
>>
>> val statRow = stat.map(line => Row(line.split("\t"):_*))
>> val df = spark.createDataFrame(statRow, schema)
>>
>> df.show
>> +------+------+----+----+
>> | col1| col2|col3|col4|
>> +------+------+----+----+
>> | uihgf| Paris| 56| 5|
>> |asfsds| ***| 43| 1|
>> |fkwsdf|London| 45| 6|
>> | gddg| ABCD| 32| 2|
>> | grgzg| *CSD| 35| 3|
>> | gsrsn| ADR*| 22| 4|
>> +------+------+----+----+
>>
>> Please let me know if this works for you.
>>
>> Regards,
>> Christophe.
>>
>>
>> On 17/02/17 10:37, Aakash Basu wrote:
>>
>> Hi all,
>>
>>
>> Without using case class I tried making a DF to work on the join and
>> other filtration later. But I'm getting an ArrayIndexOutOfBoundException
>> error while doing a show of the DF.
>>
>>
>> 1) Importing SQLContext=
>>
>> import org.apache.spark.sql.SQLContext._
>>
>> import org.apache.spark.sql.SQLContext
>>
>>
>>
>> 2) Initializing SQLContext=
>>
>> val sqlContext = new SQLContext(sc)
>>
>>
>>
>> 3) Importing implicits package for toDF conversion=
>>
>> import sqlContext.implicits._
>>
>>
>>
>> 4) Reading the Station and Storm Files=
>>
>> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>>
>> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>>
>>
>>
>>
>>
>> stat.foreach(println)
>>
>>
>> uihgf Paris 56 5
>>
>> asfsds *** 43 1
>>
>> fkwsdf London 45 6
>>
>> gddg ABCD 32 2
>>
>> grgzg *CSD 35 3
>>
>> gsrsn ADR* 22 4
>>
>>
>> 5) Creating row by segregating columns after reading the tab delimited
>> file before converting into DF=
>>
>>
>> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
>> x.split("\t")(2),x.split("\t")(3)))*
>>
>>
>>
>> 6) Converting into DF=
>>
>> val station = stati.toDF()
>>
>> *station.show* is giving the below error ->
>>
>> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
>> 15)
>> java.lang.ArrayIndexOutOfBoundsException: 1
>>
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>>
>>
>> ------------------------------
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>
Re: How to convert RDD to DF for this case -
Posted by Aakash Basu <aa...@gmail.com>.
Hey Chris,
Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.
I did this -
1) *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=
val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))
Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=
stati.take(2)
*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))
If this comes out, it means it is working fine. We can proceed.
2) *V.Imp* - Now converting into DF=
val station =
stati.toDF("StationKey","StationName","Temparature","StationID")
Now doing a show to see how it looks like=
station.show
*Ans:*
* +----------+-----------+-----------+---------+*
*|StationKey|StationName|Temparature|StationID|*
*+----------+-----------+-----------+---------+*
*| uihgf| Pune| 56| 5|*
*| asfsds| ***| 43| 1|*
*| fkwsdf| Mumbai| 45| 6|*
*| gddg| ABCD| 32| 2|*
*| grgzg| *CSD**| 35| 3|*
*| gsrsn| Howrah| 22| 4|*
*| ffafv| ***| 34| 7|*
*+----------+-----------+-----------+---------+*
3) Do the same for the other dataset -
i) val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
ii) storr.take(2)
iii) val storm = storr.toDF("ID","Name","Temp","Code")
iv) storm.show
4) Registering as table=
val stations2 = station.registerTempTable("Stations")
val storms2 = storm.registerTempTable("Storms")
5) Querying on the joinedDF as per requirements=
val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")
6) joinedDF.show
+-----------+---------+
|StationName|StationID|
+-----------+---------+
| Pune| 5|
+-----------+---------+
7) Saving the file as CSV=
joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")
Thanks,
Aakash.
On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
christophe.preaud@kelkoo.com> wrote:
> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +------+------+----+----+
> | col1| col2|col3|col4|
> +------+------+----+----+
> | uihgf| Paris| 56| 5|
> |asfsds| ***| 43| 1|
> |fkwsdf|London| 45| 6|
> | gddg| ABCD| 32| 2|
> | grgzg| *CSD| 35| 3|
> | gsrsn| ADR*| 22| 4|
> +------+------+----+----+
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1) Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2) Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3) Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4) Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf Paris 56 5
>
> asfsds *** 43 1
>
> fkwsdf London 45 6
>
> gddg ABCD 32 2
>
> grgzg *CSD 35 3
>
> gsrsn ADR* 22 4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2),x.split("\t")(3)))*
>
>
>
> 6) Converting into DF=
>
> val station = stati.toDF()
>
> *station.show* is giving the below error ->
>
> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
> 15)
> java.lang.ArrayIndexOutOfBoundsException: 1
>
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
> ------------------------------
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>
Re: How to convert RDD to DF for this case -
Posted by Christophe Préaud <ch...@kelkoo.com>.
Hi Aakash,
You can try this:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
val header = Array("col1", "col2", "col3", "col4")
val schema = StructType(header.map(StructField(_, StringType, true)))
val statRow = stat.map(line => Row(line.split("\t"):_*))
val df = spark.createDataFrame(statRow, schema)
df.show
+------+------+----+----+
| col1| col2|col3|col4|
+------+------+----+----+
| uihgf| Paris| 56| 5|
|asfsds| ***| 43| 1|
|fkwsdf|London| 45| 6|
| gddg| ABCD| 32| 2|
| grgzg| *CSD| 35| 3|
| gsrsn| ADR*| 22| 4|
+------+------+----+----+
Please let me know if this works for you.
Regards,
Christophe.
On 17/02/17 10:37, Aakash Basu wrote:
Hi all,
Without using case class I tried making a DF to work on the join and other filtration later. But I'm getting an ArrayIndexOutOfBoundException error while doing a show of the DF.
1) Importing SQLContext=
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.SQLContext
2) Initializing SQLContext=
val sqlContext = new SQLContext(sc)
3) Importing implicits package for toDF conversion=
import sqlContext.implicits._
4) Reading the Station and Storm Files=
val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
stat.foreach(println)
uihgf Paris 56 5
asfsds *** 43 1
fkwsdf London 45 6
gddg ABCD 32 2
grgzg *CSD 35 3
gsrsn ADR* 22 4
5) Creating row by segregating columns after reading the tab delimited file before converting into DF=
val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1), x.split("\t")(2),x.split("\t")(3)))
6) Converting into DF=
val station = stati.toDF()
station.show is giving the below error ->
17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 15)
java.lang.ArrayIndexOutOfBoundsException: 1
Please help!
Thanks,
Aakash.
________________________________
Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris
Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.