You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Grega Kešpret <gr...@celtra.com> on 2014/09/21 18:04:43 UTC
Shuffle size difference - operations on RDD vs. operations on SchemaRDD
Hi,
I am seeing different shuffle write sizes when using SchemaRDD (versus
normal RDD). I'm doing the following:
case class DomainObj(a: String, b: String, c: String, d: String)
val logs: RDD[String] = sc.textFile(...)
val filtered: RDD[String] = logs.filter(...)
val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)
------------------------------------------------------------
1. Operations on RDD:
------------------------------------------------------------
val results = requests
.filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
.mapPartitions(objs => objs.map(obj => (obj, 1)))
.reduceByKey(_ + _, 200)
.collect()
------------------------------------------------------------
2. Operations on SchemaRDD:
------------------------------------------------------------
myDomainObjects.registerTempTable("myDomainObjects")
val results = sqlContext.sql("""
SELECT
a, b, c, d, COUNT(*) total
FROM
myDomainObjects
WHERE
a IN ('SomeValue', 'SomeOtherValue')
GROUP BY
a, b, c, d
""").collect()
In the first case (RDD), the query returns in 2 minutes and 30 seconds with
the input size 28.4GB, and shuffle write size 525.3MB and shuffle read size
472.5MB.
In the second case (SchemaRDD), the query returns in 2 minutes and 9
seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
233.0MB.
Since in the second case, the shuffle size is half of the first case, I'd
like to understand why.
Thanks,
Grega
Re: Shuffle size difference - operations on RDD vs. operations on SchemaRDD
Posted by Michael Armbrust <mi...@databricks.com>.
Spark SQL always uses a custom configuration of Kryo under the hood to
improve shuffle performance:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
Michael
On Sun, Sep 21, 2014 at 9:04 AM, Grega Kešpret <gr...@celtra.com> wrote:
> Hi,
>
> I am seeing different shuffle write sizes when using SchemaRDD (versus
> normal RDD). I'm doing the following:
>
> case class DomainObj(a: String, b: String, c: String, d: String)
>
> val logs: RDD[String] = sc.textFile(...)
> val filtered: RDD[String] = logs.filter(...)
> val myDomainObjects: RDD[DomainObj] = filtered.flatMap(...)
>
> ------------------------------------------------------------
> 1. Operations on RDD:
> ------------------------------------------------------------
> val results = requests
> .filter(obj => obj.a == "SomeValue" || obj.a == "SomeOtherValue")
> .mapPartitions(objs => objs.map(obj => (obj, 1)))
> .reduceByKey(_ + _, 200)
> .collect()
>
> ------------------------------------------------------------
> 2. Operations on SchemaRDD:
> ------------------------------------------------------------
> myDomainObjects.registerTempTable("myDomainObjects")
>
> val results = sqlContext.sql("""
> SELECT
> a, b, c, d, COUNT(*) total
> FROM
> myDomainObjects
> WHERE
> a IN ('SomeValue', 'SomeOtherValue')
> GROUP BY
> a, b, c, d
> """).collect()
>
> In the first case (RDD), the query returns in 2 minutes and 30 seconds
> with the input size 28.4GB, and shuffle write size 525.3MB and shuffle read
> size 472.5MB.
>
> In the second case (SchemaRDD), the query returns in 2 minutes and 9
> seconds with input size 28.4GB, and shuffle write 258.9MB and shuffle read
> 233.0MB.
>
> Since in the second case, the shuffle size is half of the first case, I'd
> like to understand why.
>
> Thanks,
> Grega
>