You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Noam Barcay (JIRA)" <ji...@apache.org> on 2015/05/12 12:32:59 UTC

[jira] [Updated] (SPARK-7564) possible performance bottleneck in SparkSQL's SparkSqlSerializer class

     [ https://issues.apache.org/jira/browse/SPARK-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Noam Barcay updated SPARK-7564:
-------------------------------
    Description: 
Running a query over a table that's completely cached in memory I came across surprisingly slow performance. The query is a simple SELECT over a 10Gb table that sits comfortably in memory (Storage tab in Spark UI affirms this). It is held in 60 partitions, to improve parallelism. All operations are over memory, no shuffle is taking place (again, seen via Spark UI).
Looking at some periodic thread-dumps of the workers I see almost all worker threads - 20 in each worker, as the table has 20 partitions in each node - were in one of two states, related to serialization:

1) either trying to acquire an instance of a Kryo serializer from the pool in SparkSqlSerializer, like so:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)



2) or trying to release one:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)



I speculate that when contention is high on the underlying ArrayBlockingQueue holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I would speculate it shouldn't.

The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY col1").collect() // the ORDER BY here is added to induce a shuffle so the table will be cached in more partitions than it is found in the underlying HDFS storage
hiveContext.sql("select col1, col2, col3 from cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick, it's the select that's taking long



  was:
Running a query over a table that's completely cached in memory I came across surprisingly slow performance. The query is a simple SELECT over a 10Gb table that sits comfortably in memory (Storage tab in Spark UI affirms this). It is held in 60 partitions, to improve parallelism. All operations are over memory, no shuffle is taking place (again, seen via Spark UI).
Looking at some periodic thread-dumps of the workers I see almost all worker threads - 20 in each worker, as the table has 20 partitions in each node - were in one of two states, related to serialization - either trying to acquire an instance of a Kryo serializer from the pool in SparkSqlSerializer, like so:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)

or trying to release one:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
...
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
...
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)

I speculate that when contention is high on the underlying ArrayBlockingQueue holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I would speculate it shouldn't.

The code I'm using (running from Spark shell):
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY col1").collect() // the ORDER BY here is added to induce a shuffle so the table will be cached in more partitions than it is found in the underlying HDFS storage
hiveContext.sql("select col1, col2, col3 from cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick, it's the select that's taking long




> possible performance bottleneck in SparkSQL's SparkSqlSerializer class
> ----------------------------------------------------------------------
>
>                 Key: SPARK-7564
>                 URL: https://issues.apache.org/jira/browse/SPARK-7564
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.1
>         Environment: 3 node cluster, each with 100g RAM and 40 cores
>            Reporter: Noam Barcay
>
> Running a query over a table that's completely cached in memory I came across surprisingly slow performance. The query is a simple SELECT over a 10Gb table that sits comfortably in memory (Storage tab in Spark UI affirms this). It is held in 60 partitions, to improve parallelism. All operations are over memory, no shuffle is taking place (again, seen via Spark UI).
> Looking at some periodic thread-dumps of the workers I see almost all worker threads - 20 in each worker, as the table has 20 partitions in each node - were in one of two states, related to serialization:
> 1) either trying to acquire an instance of a Kryo serializer from the pool in SparkSqlSerializer, like so:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> ...
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:361)
> com.twitter.chill.ResourcePool.borrow(ResourcePool.java:35)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:82)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> 2) or trying to release one:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> ...
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:298)
> com.twitter.chill.ResourcePool.release(ResourcePool.java:50)
> org.apache.spark.sql.execution.SparkSqlSerializer$.acquireRelease(SparkSqlSerializer.scala:86)
> ...
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
> I speculate that when contention is high on the underlying ArrayBlockingQueue holding the list of Kryo serialiazers, it starts becoming a bottleneck? Which I would speculate it shouldn't.
> The code I'm using (running from Spark shell):
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("SET spark.sql.shuffle.partitions=60").collect()
> hiveContext.sql("CACHE TABLE cached_tbl AS SELECT * FROM tbl1 ORDER BY col1").collect() // the ORDER BY here is added to induce a shuffle so the table will be cached in more partitions than it is found in the underlying HDFS storage
> hiveContext.sql("select col1, col2, col3 from cached_tbl").saveAsTextFile("/some-path") // the save is actually very quick, it's the select that's taking long



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org