You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2015/01/29 02:29:34 UTC

[jira] [Closed] (SPARK-5346) Parquet filter pushdown is not enabled when parquet.task.side.metadata is set to true (default value)

     [ https://issues.apache.org/jira/browse/SPARK-5346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cheng Lian closed SPARK-5346.
-----------------------------
    Resolution: Not a Problem

I verified that filter push-down actually is enabled even if we set {{parquet.task.side.metadata}} to {{true}}.

The actual filtering happens when the {{ParquetRecordReader.initialize()}} is called in {{NewHadoopRDD.compute}}. See [here|https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L135] and [here|https://github.com/apache/incubator-parquet-mr/blob/parquet-1.6.0rc3/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java#L157-L158].

As for Spark task input size, it seems that Hadoop {{FileSystem}} adds the size of a block to the metrics even if we only touch a fraction of it (reading Parquet metadata for example).  This behaviour can be verified by the following snippet:
{code}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
import sc._
import sqlContext._

case class KeyValue(key: Int, value: String)

parallelize(1 to 1024 * 1024 * 20).
  flatMap(i => Seq.fill(10)(KeyValue(i, i.toString))).
  saveAsParquetFile("large.parquet")

hadoopConfiguration.set("parquet.task.side.metadata", "true")
sql("SET spark.sql.parquet.filterPushdown=true")

parquetFile("large.parquet").where('key === 0).queryExecution.toRdd.mapPartitions { _ =>
  new Iterator[Row] {
    def hasNext = false
    def next() = ???
  }
}.collect()
{code}

Apparently we're reading nothing here (except for Parquet metadata in the footers), but the web UI still suggests that the input size of all tasks equals to the file size.  In addition, we may find log lines written by {{ParquetRecordReader}} like this:
{code}
...
15/01/28 16:50:56 INFO FilterCompat: Filtering using predicate: eq(key, 0)
15/01/28 16:50:56 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 0 records.
...
{code}
which suggests row group filtering does work as expected.

So I'll just close this ticket.

> Parquet filter pushdown is not enabled when parquet.task.side.metadata is set to true (default value)
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5346
>                 URL: https://issues.apache.org/jira/browse/SPARK-5346
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Cheng Lian
>            Priority: Blocker
>
> When computing Parquet splits, reading Parquet metadata from executor side is more memory efficient, thus Spark SQL [sets {{parquet.task.side.metadata}} to {{true}} by default|https://github.com/apache/spark/blob/v1.2.0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala#L437]. However, somehow this disables filter pushdown. 
> To workaround this issue and enable Parquet filter pushdown, users can set {{spark.sql.parquet.filterPushdown}} to {{true}} and {{parquet.task.side.metadata}} to {{false}}. However, for large Parquet files with a large number of part-files and/or columns, reading metadata from driver side eats lots of memory.
> The following Spark shell snippet can be useful to reproduce this issue:
> {code}
> import org.apache.spark.sql.SQLContext
> val sqlContext = new SQLContext(sc)
> import sqlContext._
> case class KeyValue(key: Int, value: String)
> sc.
>   parallelize(1 to 1024).
>   flatMap(i => Seq.fill(1024)(KeyValue(i, i.toString))).
>   saveAsParquetFile("large.parquet")
> parquetFile("large.parquet").registerTempTable("large")
> sql("SET spark.sql.parquet.filterPushdown=true")
> sql("SELECT * FROM large").collect()
> sql("SELECT * FROM large WHERE key < 200").collect()
> {code}
> Users can verify this issue by checking the input size metrics from web UI. When filter pushdown is enabled, the second query reads fewer data.
> Notice that {{parquet.task.side.metadata}} must be set in _Hadoop_ configuration (either via {{core-site.xml}} or {{SparkConf.hadoopConfiguration.set()}}), setting it in {{spark-defaults.conf}} or via {{SparkConf}} does NOT work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org