You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by qinwei <we...@dewmobile.net> on 2014/09/28 08:36:19 UTC

problem with patitioning






Hi, everyone� � I come across a problem with changing the patition number of the rdd, �my code is as below:� ��val rdd1 = sc.textFile(path1)�� ��val rdd2 = sc.textFile(path2)

� ��val rdd3 = sc.textFile(path3)



� ��val imeiList = parseParam(job.jobParams)

� ��val broadcastVar = sc.broadcast(imeiList)

� ��val structuredRDD1 = rdd1.map(line => { � � � � ��val trunks = line.split("\t")

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��if(trunks.length == 35){

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��(trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong)

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��}

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��})

� ��val structuredRDD2 = rdd2.map(line => { � � � � ��val trunks = line.split("\t")

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��if(trunks.length == 33){

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��(trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��}

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��})

� ��val structuredRDD3 = rdd3.map(line => { � � � � �val trunks = line.split("\t")

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��if(trunks.length == 33){

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��(trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� �� ��� ��� ��� ��}

� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��� ��})

� ��val unionedRDD =�structuredRDD1.union(structuredRDD2).union(structuredRDD3)

� ��val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
.map(arg => arg.asInstanceOf[(String, String, String, String, String, Long)])
.filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, broadcastVar.value, 0))
� ��val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", \"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" + arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " + arg._6.toString() + "}")

� ��val jsonArray = jsonStrRDD.collect
� � I noticed that there are 3834 tasks by default, �and 3834 is the number of files in path1 and path2 and path3, �i want to change the number of patition by the code below:� ��val rdd1 = sc.textFile(path1, 1920)�
� ��val rdd2 = sc.textFile(path2, 1920)�
� ��val rdd3 = sc.textFile(path3, 1920)
� � by doing this, i expect there are 1920 tasks totally, but i found the number of tasks becomes 8920,�any idea what's going on here?
� ��Thanks!



qinwei


Re: Re: problem with patitioning

Posted by qinwei <we...@dewmobile.net>.





Thank you for your reply, and your tips on code refactoring is helpful, after a second look on the code, the casts and null check is really unnecessary.


qinwei
 From: Sean OwenDate: 2014-09-28 15:03To: qinweiCC: userSubject: Re: problem with patitioning(Most of this code is not relevant to the question and can be refactored too. The casts and null checks look unnecessary.)
You are unioning RDDs so you have a result with the sum of their partitions. The number of partitions is really a hint to Hadoop only so it is not even necessarily 3 x 1920.
Try not specifying the partitions at the source, and instead trying repartition after union to reduce the number of partitions. 
On Sep 28, 2014 7:36 AM, "qinwei" <we...@dewmobile.net> wrote:

Hi, everyone    I come across a problem with changing the patition number of the rdd,  my code is as below:    val rdd1 = sc.textFile(path1)     val rdd2 = sc.textFile(path2)

    val rdd3 = sc.textFile(path3)



    val imeiList = parseParam(job.jobParams)

    val broadcastVar = sc.broadcast(imeiList)

    val structuredRDD1 = rdd1.map(line => {           val trunks = line.split("\t")

                                                                                    if(trunks.length == 35){

                                                                                            (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong)

                                                                                            }

                                                                                })

    val structuredRDD2 = rdd2.map(line => {           val trunks = line.split("\t")

                                                                                    if(trunks.length == 33){

                                                                                            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

                                                                                                }

                                                                                })

    val structuredRDD3 = rdd3.map(line => {          val trunks = line.split("\t")

                                                                                    if(trunks.length == 33){

                                                                                            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)

                                                                                               }

                                                                        })

    val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3)

    val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
.map(arg => arg.asInstanceOf[(String, String, String, String, String, Long)])
.filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, broadcastVar.value, 0))
    val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", \"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" + arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " + arg._6.toString() + "}")

    val jsonArray = jsonStrRDD.collect
    I noticed that there are 3834 tasks by default,  and 3834 is the number of files in path1 and path2 and path3,  i want to change the number of patition by the code below:    val rdd1 = sc.textFile(path1, 1920) 
    val rdd2 = sc.textFile(path2, 1920) 
    val rdd3 = sc.textFile(path3, 1920)
    by doing this, i expect there are 1920 tasks totally, but i found the number of tasks becomes 8920, any idea what's going on here?
    Thanks!



qinwei



Re: problem with patitioning

Posted by Sean Owen <so...@cloudera.com>.
(Most of this code is not relevant to the question and can be refactored
too. The casts and null checks look unnecessary.)

You are unioning RDDs so you have a result with the sum of their
partitions. The number of partitions is really a hint to Hadoop only so it
is not even necessarily 3 x 1920.

Try not specifying the partitions at the source, and instead trying
repartition after union to reduce the number of partitions.
On Sep 28, 2014 7:36 AM, "qinwei" <we...@dewmobile.net> wrote:

> Hi, everyone
>     I come across a problem with changing the patition number of the rdd,
>  my code is as below:
>     val rdd1 = sc.textFile(path1)
>     val rdd2 = sc.textFile(path2)
>     val rdd3 = sc.textFile(path3)
>
>     val imeiList = parseParam(job.jobParams)
>     val broadcastVar = sc.broadcast(imeiList)
>     val structuredRDD1 = rdd1.map(line => {           val trunks =
> line.split("\t")
>
>           if(trunks.length == 35){
>
>                   (trunks(6).trim, trunks(7).trim, trunks(3).trim,
> trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong)
>
>                   }
>
>       })
>     val structuredRDD2 = rdd2.map(line => {           val trunks =
> line.split("\t")
>
>           if(trunks.length == 33){
>
>                   (trunks(6).trim, trunks(8).trim, trunks(9).trim,
> trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)
>
>                       }
>
>       })
>     val structuredRDD3 = rdd3.map(line => {          val trunks =
> line.split("\t")
>
>           if(trunks.length == 33){
>
>                   (trunks(6).trim, trunks(8).trim, trunks(9).trim,
> trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)
>
>                      }
>                                                                         })
>
>     val unionedRDD
> = structuredRDD1.union(structuredRDD2).union(structuredRDD3)
>     val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
> .map(arg => arg.asInstanceOf[(String, String, String, String, String,
> Long)]) .filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) ||
> imeiFilter(arg._2, broadcastVar.value, 0))
>     val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\",
> \"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \""
> + arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " +
> arg._6.toString() + "}")
>     val jsonArray = jsonStrRDD.collect
>
>     I noticed that there are 3834 tasks by default,  and 3834 is the
> number of files in path1 and path2 and path3,  i want to change the
> number of patition by the code below:
>     val rdd1 = sc.textFile(path1, 1920)
>     val rdd2 = sc.textFile(path2, 1920)
>     val rdd3 = sc.textFile(path3, 1920)
>
>     by doing this, i expect there are 1920 tasks totally, but i found the
> number of tasks becomes 8920, any idea what's going on here?
>
>     Thanks!
>
>
> ------------------------------
> qinwei
>