You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/06/07 14:44:21 UTC
[jira] [Resolved] (SPARK-13570) pyspark save with partitionBy is
very slow
[ https://issues.apache.org/jira/browse/SPARK-13570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-13570.
-------------------------------
Resolution: Incomplete
> pyspark save with partitionBy is very slow
> ------------------------------------------
>
> Key: SPARK-13570
> URL: https://issues.apache.org/jira/browse/SPARK-13570
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Reporter: Shubhanshu Mishra
> Labels: dataframe, partitioning, pyspark, save
>
> Running the following code to store data from each year and pos in a seperate folder for a very large dataframe is taking a huge amount of time. (>37 hours for 60% of the work)
> {code}
> ## IPYTHON was started using the following command:
> # IPYTHON=1 "$SPARK_HOME/bin/pyspark" --driver-memory 50g
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import SQLContext, Row
> from pyspark.sql.types import *
> conf = SparkConf()
> conf.setMaster("local[30]")
> conf.setAppName("analysis")
> conf.set("spark.local.dir", "./tmp")
> conf.set("spark.executor.memory", "50g")
> conf.set("spark.driver.maxResultSize", "5g")
> sc = SparkContext(conf=conf)
> sqlContext = SQLContext(sc)
> df = sqlContext.read.format("csv").options(header=False, inferschema=True, delimiter="\t").load("out/new_features")
> df = df.selectExpr(*("%s as %s" % (df.columns[i], k) for i,k in enumerate(columns)))
> # year can take values from [1902,2015]
> # pos takes integer values from [-1,0,1,2]
> # df is a dataframe with 20 columns and 1 billion rows
> # Running on Machine with 32 cores and 500 GB RAM
> df.write.save("out/model_input_partitioned", format="csv", partitionBy=["year", "pos"], delimiter="\t")
> {code}
> Currently, the code is at:
> [Stage 12:==============================> (1367 + 30) / 2290]
> And it has already been more than 37 hours. A single sweep on this data for filter by value takes less than 6.5 minutes.
> The spark web interface shows the following lines for the 2 stages of the job:
> Stage Description Submitted Duration Tasks:succeeded/total Input Output Shuffle Read Shuffle Write
> 11 load at NativeMethodAccessorImpl.java:-2 +details 2016/02/27 23:07:04 6.5 min 2290/2290 66.8 GB
> 12 save at NativeMethodAccessorImpl.java:-2 +details 2016/02/27 23:15:59 37.1 h 1370/2290 40.9 GB
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org