You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Maciej Bryński (JIRA)" <ji...@apache.org> on 2016/07/04 21:02:10 UTC
[jira] [Updated] (SPARK-16320) Spark 2.0 slower than 1.6 when
querying nested columns
[ https://issues.apache.org/jira/browse/SPARK-16320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maciej Bryński updated SPARK-16320:
-----------------------------------
Description:
I did some test on parquet file with many nested columns (about 30G in
400 partitions) and Spark 2.0 is sometimes slower.
I tested following queries:
1) {code}select count(*) where id > some_id{code}
In this query performance is similar. (about 1 sec)
2) {code}select count(*) where nested_column.id > some_id{code}
Spark 1.6 -> 1.6 min
Spark 2.0 -> 2.1 min
Should I expect such a drop in performance ?
I don't know how to prepare sample data to show the problem.
Any ideas ? Or public data with many nested columns ?
UPDATE.
I created script to generate data and to confirm this problem.
{code}
#Initialization
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
conf.set('spark.cores.max', 15)
conf.set('spark.executor.memory', '30g')
conf.set('spark.driver.memory', '30g')
sc = SparkContext(conf=conf)
sqlctx = HiveContext(sc)
#Data creation
MAX_SIZE = 2**32 - 1
path = '/mnt/mfs/parquet_nested'
def create_sample_data(levels, rows, path):
def _create_column_data(cols):
import random
random.seed()
return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in range(cols)}
def _create_sample_df(cols, rows):
rdd = sc.parallelize(range(rows))
data = rdd.map(lambda r: _create_column_data(cols))
df = sqlctx.createDataFrame(data)
return df
def _create_nested_data(levels, rows):
if len(levels) == 1:
return _create_sample_df(levels[0], rows).cache()
else:
df = _create_nested_data(levels[1:], rows)
return df.select([struct(df.columns).alias("column{}".format(i)) for i in range(levels[0])])
df = _create_nested_data(levels, rows)
#return df
df.write.mode('overwrite').parquet(path)
#Sample data
create_sample_data([2,10,200], 1000000, path)
#Query
{code}
was:
I did some test on parquet file with many nested columns (about 30G in
400 partitions) and Spark 2.0 is sometimes slower.
I tested following queries:
1) {code}select count(*) where id > some_id{code}
In this query performance is similar. (about 1 sec)
2) {code}select count(*) where nested_column.id > some_id{code}
Spark 1.6 -> 1.6 min
Spark 2.0 -> 2.1 min
Should I expect such a drop in performance ?
I don't know how to prepare sample data to show the problem.
Any ideas ? Or public data with many nested columns ?
> Spark 2.0 slower than 1.6 when querying nested columns
> ------------------------------------------------------
>
> Key: SPARK-16320
> URL: https://issues.apache.org/jira/browse/SPARK-16320
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Maciej Bryński
>
> I did some test on parquet file with many nested columns (about 30G in
> 400 partitions) and Spark 2.0 is sometimes slower.
> I tested following queries:
> 1) {code}select count(*) where id > some_id{code}
> In this query performance is similar. (about 1 sec)
> 2) {code}select count(*) where nested_column.id > some_id{code}
> Spark 1.6 -> 1.6 min
> Spark 2.0 -> 2.1 min
> Should I expect such a drop in performance ?
> I don't know how to prepare sample data to show the problem.
> Any ideas ? Or public data with many nested columns ?
> UPDATE.
> I created script to generate data and to confirm this problem.
> {code}
> #Initialization
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> conf = SparkConf()
> conf.set('spark.cores.max', 15)
> conf.set('spark.executor.memory', '30g')
> conf.set('spark.driver.memory', '30g')
> sc = SparkContext(conf=conf)
> sqlctx = HiveContext(sc)
> #Data creation
> MAX_SIZE = 2**32 - 1
> path = '/mnt/mfs/parquet_nested'
> def create_sample_data(levels, rows, path):
>
> def _create_column_data(cols):
> import random
> random.seed()
> return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in range(cols)}
>
> def _create_sample_df(cols, rows):
> rdd = sc.parallelize(range(rows))
> data = rdd.map(lambda r: _create_column_data(cols))
> df = sqlctx.createDataFrame(data)
> return df
>
> def _create_nested_data(levels, rows):
> if len(levels) == 1:
> return _create_sample_df(levels[0], rows).cache()
> else:
> df = _create_nested_data(levels[1:], rows)
> return df.select([struct(df.columns).alias("column{}".format(i)) for i in range(levels[0])])
> df = _create_nested_data(levels, rows)
> #return df
> df.write.mode('overwrite').parquet(path)
>
> #Sample data
> create_sample_data([2,10,200], 1000000, path)
> #Query
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org