You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Maciej Bryński (JIRA)" <ji...@apache.org> on 2016/07/04 21:16:10 UTC

[jira] [Commented] (SPARK-16320) Spark 2.0 slower than 1.6 when querying nested columns

    [ https://issues.apache.org/jira/browse/SPARK-16320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361781#comment-15361781 ] 

Maciej Bryński commented on SPARK-16320:
----------------------------------------

[~rxin]
I created benchmark script and added results.

Spark 2.0 is about 25% slower than 1.6.

> Spark 2.0 slower than 1.6 when querying nested columns
> ------------------------------------------------------
>
>                 Key: SPARK-16320
>                 URL: https://issues.apache.org/jira/browse/SPARK-16320
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Maciej Bryński
>
> I did some test on parquet file with many nested columns (about 30G in
> 400 partitions) and Spark 2.0 is sometimes slower.
> I tested following queries:
> 1) {code}select count(*) where id > some_id{code}
> In this query performance is similar. (about 1 sec)
> 2) {code}select count(*) where nested_column.id > some_id{code}
> Spark 1.6 -> 1.6 min
> Spark 2.0 -> 2.1 min
> Should I expect such a drop in performance ?
> I don't know how to prepare sample data to show the problem.
> Any ideas ? Or public data with many nested columns ?
> UPDATE.
> I created script to generate data and to confirm this problem.
> {code}
> #Initialization
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> conf = SparkConf()
> conf.set('spark.cores.max', 15)
> conf.set('spark.executor.memory', '30g')
> conf.set('spark.driver.memory', '30g')
> sc = SparkContext(conf=conf)
> sqlctx = HiveContext(sc)
> #Data creation
> MAX_SIZE = 2**32 - 1
> path = '/mnt/mfs/parquet_nested'
> def create_sample_data(levels, rows, path):
>     
>     def _create_column_data(cols):
>         import random
>         random.seed()
>         return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in range(cols)}
>     
>     def _create_sample_df(cols, rows):
>         rdd = sc.parallelize(range(rows))           
>         data = rdd.map(lambda r: _create_column_data(cols))
>         df = sqlctx.createDataFrame(data)
>         return df
>     
>     def _create_nested_data(levels, rows):
>         if len(levels) == 1:
>             return _create_sample_df(levels[0], rows).cache()
>         else:
>             df = _create_nested_data(levels[1:], rows)
>             return df.select([struct(df.columns).alias("column{}".format(i)) for i in range(levels[0])])
>     df = _create_nested_data(levels, rows)
>     #return df
>     df.write.mode('overwrite').parquet(path)
>     
> #Sample data
> create_sample_data([2,10,200], 1000000, path)
> #Query
> df = sqlctx.read.parquet(path)
> %%timeit
> df.where("column1.column5.column50 > {}".format(int(MAX_SIZE / 2))).count()
> {code}
> Results
> Spark 1.6
> 1 loop, best of 3: *1min 5s* per loop
> Spark 2.0
> 1 loop, best of 3: *1min 21s* per loop



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org