You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2015/01/21 09:40:35 UTC

[jira] [Updated] (SPARK-5346) Parquet filter pushdown is not enabled when parquet.task.side.metadata is set to true (default value)

     [ https://issues.apache.org/jira/browse/SPARK-5346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cheng Lian updated SPARK-5346:
------------------------------
    Description: 
When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown. 

To workaround this issue and enable Parquet filter pushdown, users can set {{spark.sql.parquet.filterPushdown}} to {{true}} and {{parquet.task.side.metadata}} to {{false}}. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.

The following Spark shell snippet can be useful to reproduce this issue:
{code}
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
import sqlContext._

case class KeyValue(key: Int, value: String)

sc.
  parallelize(1 to 1024).
  flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
  saveAsParquetFile("large.parquet")

parquetFile("large.parquet").registerTempTable("large")

sql("SET spark.sql.parquet.filterPushdown=true")
sql("SELECT * FROM large").collect()
sql("SELECT * FROM large WHERE key < 200").collect()
{code}
Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.

Notice that {{parquet.task.side.metadata}} must be set in _Hadoop_ configuration files (e.g. core-site.xml), setting it in {{spark-defaults.conf}} or via {{SparkConf}} does NOT work.

  was:When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown. Now Parquet filter pushdown is only enabled when {{spark.sql.parquet.filterPushdown}} set to {{true}} while {{parquet.task.side.metadata}} set to {{false}}. For large Parquet files with thousands of part-files and/or thousands of columns, reading metadata from driver side eats lots of memory.


> Parquet filter pushdown is not enabled when parquet.task.side.metadata is set to true (default value)
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5346
>                 URL: https://issues.apache.org/jira/browse/SPARK-5346
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Cheng Lian
>            Priority: Critical
>
> When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown. 
> To workaround this issue and enable Parquet filter pushdown, users can set {{spark.sql.parquet.filterPushdown}} to {{true}} and {{parquet.task.side.metadata}} to {{false}}. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.
> The following Spark shell snippet can be useful to reproduce this issue:
> {code}
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sc)
> import sqlContext._
> case class KeyValue(key: Int, value: String)
> sc.
>   parallelize(1 to 1024).
>   flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
>   saveAsParquetFile("large.parquet")
> parquetFile("large.parquet").registerTempTable("large")
> sql("SET spark.sql.parquet.filterPushdown=true")
> sql("SELECT * FROM large").collect()
> sql("SELECT * FROM large WHERE key < 200").collect()
> {code}
> Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.
> Notice that {{parquet.task.side.metadata}} must be set in _Hadoop_ configuration files (e.g. core-site.xml), setting it in {{spark-defaults.conf}} or via {{SparkConf}} does NOT work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org