You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aakash Basu <aa...@gmail.com> on 2017/02/17 09:37:03 UTC

How to convert RDD to DF for this case -

Hi all,


Without using case class I tried making a DF to work on the join and other
filtration later. But I'm getting an ArrayIndexOutOfBoundException error
while doing a show of the DF.


1)      Importing SQLContext=

import org.apache.spark.sql.SQLContext._

import org.apache.spark.sql.SQLContext



2)      Initializing SQLContext=

val sqlContext = new SQLContext(sc)



3)      Importing implicits package for toDF conversion=

import sqlContext.implicits._



4)      Reading the Station and Storm Files=

val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")

val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")





stat.foreach(println)


uihgf   Paris   56   5

asfsds   ***   43   1

fkwsdf   London   45   6

gddg   ABCD   32   2

grgzg   *CSD   35   3

gsrsn   ADR*   22   4


5) Creating row by segregating columns after reading the tab delimited file
before converting into DF=


*val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2),x.split("\t")(3)))*



6)      Converting into DF=

val station = stati.toDF()

*station.show* is giving the below error ->

17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
15)
java.lang.ArrayIndexOutOfBoundsException: 1


Please help!

Thanks,
Aakash.

Re: How to convert RDD to DF for this case -

Posted by "颜发才 (Yan Facai)" <fa...@gmail.com>.
Hi, Basu,
if all columns is separated by delimter "\t", csv parser might be a better
choice.
for example:

```scala
spark.read
         .option("sep", "\t")
         .option("header", fasle)
         .option("inferSchema", true)
         .csv("/user/root/spark_demo/scala/data/Stations.txt")
```
More details in [DataFrameReader API](
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
)

Then we get two DataFrame,
you can register them and use sql to join.





On Fri, Feb 17, 2017 at 10:33 PM, Aakash Basu <aa...@gmail.com>
wrote:

> Hey Chris,
>
> Thanks for your quick help. Actually the dataset had issues, otherwise the
> logic I implemented was not wrong.
>
> I did this -
>
> 1)      *V.Imp *– Creating row by segregating columns after reading the
> tab delimited file before converting into DF=
>
> val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2).toInt, x.split("\t")(3).toInt))
>
>
>
> Do a take to see if it throws an error or not (this step is just for
> ensuring if everything is going fine (as it is a lazy execution, that’s
> why)=
>
> stati.take(2)
>
> *Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
> (asfsds,***,43,1))
>
> If this comes out, it means it is working fine. We can proceed.
>
> 2)      *V.Imp* - Now converting into DF=
>
> val station = stati.toDF("StationKey","StationName","Temparature","Station
> ID")
>
>
>
> Now doing a show to see how it looks like=
>
> station.show
>
> *Ans:*
>
> * +----------+-----------+-----------+---------+*
>
> *|StationKey|StationName|Temparature|StationID|*
>
> *+----------+-----------+-----------+---------+*
>
> *|     uihgf|       Pune|         56|        5|*
>
> *|    asfsds|        ***|         43|        1|*
>
> *|    fkwsdf|     Mumbai|         45|        6|*
>
> *|      gddg|       ABCD|         32|        2|*
>
> *|     grgzg|     *CSD**|         35|        3|*
>
> *|     gsrsn|     Howrah|         22|        4|*
>
> *|     ffafv|        ***|         34|        7|*
>
> *+----------+-----------+-----------+---------+*
>
>
>
> 3)      Do the same for the other dataset -
>
> i)                 val storr = stor.map(p => (p.split("\t")(0).toInt,
> p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
>
> ii)                storr.take(2)
>
> iii)               val storm = storr.toDF("ID","Name","Temp","Code")
>
> iv)               storm.show
>
>
>
>
>
> 4)      Registering as table=
>
>  val stations2 = station.registerTempTable("Stations")
>
> val storms2 = storm.registerTempTable("Storms")
>
>
>
> 5)      Querying on the joinedDF as per requirements=
>
> val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
> Stations.StationID as StationID from Stations inner join Storms on
> Storms.Code = Stations.StationKey where Stations.Temparature > 35")
>
>
>
> 6)      joinedDF.show
>
> +-----------+---------+
>
> |StationName|StationID|
>
> +-----------+---------+
>
> |       Pune|        5|
>
> +-----------+---------+
>
> 7)      Saving the file as CSV=
>
> joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile
> ("/user/root/spark_demo/scala/data/output/Question6Soln")
>
>
>
> Thanks,
>
> Aakash.
>
> On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
> christophe.preaud@kelkoo.com> wrote:
>
>> Hi Aakash,
>>
>> You can try this:
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>>
>> val header = Array("col1", "col2", "col3", "col4")
>> val schema = StructType(header.map(StructField(_, StringType, true)))
>>
>> val statRow = stat.map(line => Row(line.split("\t"):_*))
>> val df = spark.createDataFrame(statRow, schema)
>>
>> df.show
>> +------+------+----+----+
>> |  col1|  col2|col3|col4|
>> +------+------+----+----+
>> | uihgf| Paris|  56|   5|
>> |asfsds|   ***|  43|   1|
>> |fkwsdf|London|  45|   6|
>> |  gddg|  ABCD|  32|   2|
>> | grgzg|  *CSD|  35|   3|
>> | gsrsn|  ADR*|  22|   4|
>> +------+------+----+----+
>>
>> Please let me know if this works for you.
>>
>> Regards,
>> Christophe.
>>
>>
>> On 17/02/17 10:37, Aakash Basu wrote:
>>
>> Hi all,
>>
>>
>> Without using case class I tried making a DF to work on the join and
>> other filtration later. But I'm getting an ArrayIndexOutOfBoundException
>> error while doing a show of the DF.
>>
>>
>> 1)      Importing SQLContext=
>>
>> import org.apache.spark.sql.SQLContext._
>>
>> import org.apache.spark.sql.SQLContext
>>
>>
>>
>> 2)      Initializing SQLContext=
>>
>> val sqlContext = new SQLContext(sc)
>>
>>
>>
>> 3)      Importing implicits package for toDF conversion=
>>
>> import sqlContext.implicits._
>>
>>
>>
>> 4)      Reading the Station and Storm Files=
>>
>> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>>
>> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>>
>>
>>
>>
>>
>> stat.foreach(println)
>>
>>
>> uihgf   Paris   56   5
>>
>> asfsds   ***   43   1
>>
>> fkwsdf   London   45   6
>>
>> gddg   ABCD   32   2
>>
>> grgzg   *CSD   35   3
>>
>> gsrsn   ADR*   22   4
>>
>>
>> 5) Creating row by segregating columns after reading the tab delimited
>> file before converting into DF=
>>
>>
>> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
>> x.split("\t")(2),x.split("\t")(3)))*
>>
>>
>>
>> 6)      Converting into DF=
>>
>> val station = stati.toDF()
>>
>> *station.show* is giving the below error ->
>>
>> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
>> 15)
>> java.lang.ArrayIndexOutOfBoundsException: 1
>>
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>>
>>
>> ------------------------------
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>

Re: How to convert RDD to DF for this case -

Posted by Aakash Basu <aa...@gmail.com>.
Hey Chris,

Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.

I did this -

1)      *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))



Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=

stati.take(2)

*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))

If this comes out, it means it is working fine. We can proceed.

2)      *V.Imp* - Now converting into DF=

val station =
stati.toDF("StationKey","StationName","Temparature","StationID")



Now doing a show to see how it looks like=

station.show

*Ans:*

* +----------+-----------+-----------+---------+*

*|StationKey|StationName|Temparature|StationID|*

*+----------+-----------+-----------+---------+*

*|     uihgf|       Pune|         56|        5|*

*|    asfsds|        ***|         43|        1|*

*|    fkwsdf|     Mumbai|         45|        6|*

*|      gddg|       ABCD|         32|        2|*

*|     grgzg|     *CSD**|         35|        3|*

*|     gsrsn|     Howrah|         22|        4|*

*|     ffafv|        ***|         34|        7|*

*+----------+-----------+-----------+---------+*



3)      Do the same for the other dataset -

i)                 val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))

ii)                storr.take(2)

iii)               val storm = storr.toDF("ID","Name","Temp","Code")

iv)               storm.show





4)      Registering as table=

 val stations2 = station.registerTempTable("Stations")

val storms2 = storm.registerTempTable("Storms")



5)      Querying on the joinedDF as per requirements=

val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")



6)      joinedDF.show

+-----------+---------+

|StationName|StationID|

+-----------+---------+

|       Pune|        5|

+-----------+---------+

7)      Saving the file as CSV=

joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")



Thanks,

Aakash.

On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
christophe.preaud@kelkoo.com> wrote:

> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +------+------+----+----+
> |  col1|  col2|col3|col4|
> +------+------+----+----+
> | uihgf| Paris|  56|   5|
> |asfsds|   ***|  43|   1|
> |fkwsdf|London|  45|   6|
> |  gddg|  ABCD|  32|   2|
> | grgzg|  *CSD|  35|   3|
> | gsrsn|  ADR*|  22|   4|
> +------+------+----+----+
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1)      Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2)      Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3)      Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4)      Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf   Paris   56   5
>
> asfsds   ***   43   1
>
> fkwsdf   London   45   6
>
> gddg   ABCD   32   2
>
> grgzg   *CSD   35   3
>
> gsrsn   ADR*   22   4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2),x.split("\t")(3)))*
>
>
>
> 6)      Converting into DF=
>
> val station = stati.toDF()
>
> *station.show* is giving the below error ->
>
> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
> 15)
> java.lang.ArrayIndexOutOfBoundsException: 1
>
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
> ------------------------------
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>

Re: How to convert RDD to DF for this case -

Posted by Christophe Préaud <ch...@kelkoo.com>.
Hi Aakash,

You can try this:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}

val header = Array("col1", "col2", "col3", "col4")
val schema = StructType(header.map(StructField(_, StringType, true)))

val statRow = stat.map(line => Row(line.split("\t"):_*))
val df = spark.createDataFrame(statRow, schema)

df.show
+------+------+----+----+
|  col1|  col2|col3|col4|
+------+------+----+----+
| uihgf| Paris|  56|   5|
|asfsds|   ***|  43|   1|
|fkwsdf|London|  45|   6|
|  gddg|  ABCD|  32|   2|
| grgzg|  *CSD|  35|   3|
| gsrsn|  ADR*|  22|   4|
+------+------+----+----+


Please let me know if this works for you.

Regards,
Christophe.

On 17/02/17 10:37, Aakash Basu wrote:
Hi all,

Without using case class I tried making a DF to work on the join and other filtration later. But I'm getting an ArrayIndexOutOfBoundException error while doing a show of the DF.

1)      Importing SQLContext=
import org.apache.spark.sql.SQLContext._
import org.apache.spark.sql.SQLContext

2)      Initializing SQLContext=
val sqlContext = new SQLContext(sc)

3)      Importing implicits package for toDF conversion=
import sqlContext.implicits._

4)      Reading the Station and Storm Files=
val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")



stat.foreach(println)

uihgf   Paris   56   5
asfsds   ***   43   1
fkwsdf   London   45   6
gddg   ABCD   32   2
grgzg   *CSD   35   3
gsrsn   ADR*   22   4


5) Creating row by segregating columns after reading the tab delimited file before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1), x.split("\t")(2),x.split("\t")(3)))


6)      Converting into DF=
val station = stati.toDF()

station.show is giving the below error ->

17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 15)
java.lang.ArrayIndexOutOfBoundsException: 1


Please help!

Thanks,
Aakash.


________________________________
Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.