You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Grega Kešpret <gr...@celtra.com> on 2014/09/21 18:04:43 UTC

Shuffle size difference - operations on RDD vs. operations on SchemaRDD

Hi,

I am seeing different shuffle write sizes when using SchemaRDD (versus
normal RDD). I'm doing the following:

case class DomainObj(a: String, b: String, c: String, d: String)

val logs: RDD[String] = sc.textFile(...)
val filtered: RDD[String] = logs.filter(...)
val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)

------------------------------------------------------------
1. Operations on RDD:
------------------------------------------------------------
val results = requests
    .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
    .mapPartitions(objs => objs.map(obj => (obj, 1)))
    .reduceByKey(_ + _, 200)
    .collect()

------------------------------------------------------------
2. Operations on SchemaRDD:
------------------------------------------------------------
myDomainObjects.registerTempTable("myDomainObjects")

val results = sqlContext.sql("""
    SELECT
        a, b, c, d, COUNT(*) total
    FROM
        myDomainObjects
    WHERE
        a IN ('SomeValue', 'SomeOtherValue')
    GROUP BY
        a, b, c, d
""").collect()

In the first case (RDD), the query returns in 2 minutes and 30 seconds with
the input size 28.4GB, and shuffle write size 525.3MB and shuffle read size
472.5MB.

In the second case (SchemaRDD), the query returns in 2 minutes and 9
seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
233.0MB.

Since in the second case, the shuffle size is half of the first case, I'd
like to understand why.

Thanks,
Grega

Re: Shuffle size difference - operations on RDD vs. operations on SchemaRDD

Posted by Michael Armbrust <mi...@databricks.com>.
Spark SQL always uses a custom configuration of Kryo under the hood to
improve shuffle performance:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Michael

On Sun, Sep 21, 2014 at 9:04 AM, Grega Kešpret <gr...@celtra.com> wrote:

> Hi,
>
> I am seeing different shuffle write sizes when using SchemaRDD (versus
> normal RDD). I'm doing the following:
>
> case class DomainObj(a: String, b: String, c: String, d: String)
>
> val logs: RDD[String] = sc.textFile(...)
> val filtered: RDD[String] = logs.filter(...)
> val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)
>
> ------------------------------------------------------------
> 1. Operations on RDD:
> ------------------------------------------------------------
> val results = requests
>     .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
>     .mapPartitions(objs => objs.map(obj => (obj, 1)))
>     .reduceByKey(_ + _, 200)
>     .collect()
>
> ------------------------------------------------------------
> 2. Operations on SchemaRDD:
> ------------------------------------------------------------
> myDomainObjects.registerTempTable("myDomainObjects")
>
> val results = sqlContext.sql("""
>     SELECT
>         a, b, c, d, COUNT(*) total
>     FROM
>         myDomainObjects
>     WHERE
>         a IN ('SomeValue', 'SomeOtherValue')
>     GROUP BY
>         a, b, c, d
> """).collect()
>
> In the first case (RDD), the query returns in 2 minutes and 30 seconds
> with the input size 28.4GB, and shuffle write size 525.3MB and shuffle read
> size 472.5MB.
>
> In the second case (SchemaRDD), the query returns in 2 minutes and 9
> seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
> 233.0MB.
>
> Since in the second case, the shuffle size is half of the first case, I'd
> like to understand why.
>
> Thanks,
> Grega
>