You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2015/01/21 09:40:35 UTC
[jira] [Updated] (SPARK-5346) Parquet filter pushdown is not
enabled when parquet.task.side.metadata is set to true (default value)
[ https://issues.apache.org/jira/browse/SPARK-5346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cheng Lian updated SPARK-5346:
------------------------------
Description:
When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown.
To workaround this issue and enable Parquet filter pushdown, users can set {{spark.sql.parquet.filterPushdown}} to {{true}} and {{parquet.task.side.metadata}} to {{false}}. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.
The following Spark shell snippet can be useful to reproduce this issue:
{code}
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext._
case class KeyValue(key: Int, value: String)
sc.
parallelize(1 to 1024).
flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
saveAsParquetFile("large.parquet")
parquetFile("large.parquet").registerTempTable("large")
sql("SET spark.sql.parquet.filterPushdown=true")
sql("SELECT * FROM large").collect()
sql("SELECT * FROM large WHERE key < 200").collect()
{code}
Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.
Notice that {{parquet.task.side.metadata}} must be set in _Hadoop_ configuration files (e.g. core-site.xml), setting it in {{spark-defaults.conf}} or via {{SparkConf}} does NOT work.
was:When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown. Now Parquet filter pushdown is only enabled when {{spark.sql.parquet.filterPushdown}} set to {{true}} while {{parquet.task.side.metadata}} set to {{false}}. For large Parquet files with thousands of part-files and/or thousands of columns, reading metadata from driver side eats lots of memory.
> Parquet filter pushdown is not enabled when parquet.task.side.metadata is set to true (default value)
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-5346
> URL: https://issues.apache.org/jira/browse/SPARK-5346
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.2.0, 1.3.0
> Reporter: Cheng Lian
> Priority: Critical
>
> When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown.
> To workaround this issue and enable Parquet filter pushdown, users can set {{spark.sql.parquet.filterPushdown}} to {{true}} and {{parquet.task.side.metadata}} to {{false}}. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.
> The following Spark shell snippet can be useful to reproduce this issue:
> {code}
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sc)
> import sqlContext._
> case class KeyValue(key: Int, value: String)
> sc.
> parallelize(1 to 1024).
> flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
> saveAsParquetFile("large.parquet")
> parquetFile("large.parquet").registerTempTable("large")
> sql("SET spark.sql.parquet.filterPushdown=true")
> sql("SELECT * FROM large").collect()
> sql("SELECT * FROM large WHERE key < 200").collect()
> {code}
> Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.
> Notice that {{parquet.task.side.metadata}} must be set in _Hadoop_ configuration files (e.g. core-site.xml), setting it in {{spark-defaults.conf}} or via {{SparkConf}} does NOT work.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org