You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/14 23:59:37 UTC

spark git commit: [SPARK-4391][SQL] Configure parquet filters using SQLConf

Repository: spark
Updated Branches:
  refs/heads/master a0300ea32 -> e47c38763


[SPARK-4391][SQL] Configure parquet filters using SQLConf

This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext.  In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258).  When those are fixed we should turn it back on by default.

Author: Michael Armbrust <mi...@databricks.com>

Closes #3258 from marmbrus/parquetFilters and squashes the following commits:

5655bfe [Michael Armbrust] Remove extra line.
15e9a98 [Michael Armbrust] Enable filters for tests
75afd39 [Michael Armbrust] Fix comments
78fa02d [Michael Armbrust] off by default
e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47c3876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47c3876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47c3876

Branch: refs/heads/master
Commit: e47c38763914aaf89a7a851c5f41b7549a75615b
Parents: a0300ea
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Nov 14 14:59:35 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Nov 14 14:59:35 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala  |  8 +++++++-
 .../apache/spark/sql/execution/SparkStrategies.scala   |  7 +++++--
 .../org/apache/spark/sql/parquet/ParquetFilters.scala  |  2 --
 .../spark/sql/parquet/ParquetTableOperations.scala     | 13 +++++++------
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala   |  2 ++
 5 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495a..cd7d78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
 
 import java.util.Properties
 
-
 private[spark] object SQLConf {
   val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
   val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
+
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
   val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
 
   // This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
 
+  /** When true predicates will be passed to the parquet record reader when possible. */
+  private[spark] def parquetFilterPushDown =
+    getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+
   /**
    * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
    * that evaluates expressions found in queries.  In general this custom code runs much faster

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c0..03cd5bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
         val prunePushedDownFilters =
-          if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
+          if (sqlContext.parquetFilterPushDown) {
             (filters: Seq[Expression]) => {
               filters.filter { filter =>
                 // Note: filters cannot be pushed down to Parquet if they contain more complex
@@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           projectList,
           filters,
           prunePushedDownFilters,
-          ParquetTableScan(_, relation, filters)) :: Nil
+          ParquetTableScan(
+            _,
+            relation,
+            if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
 
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 1e67799..9a3f6d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._
 
 private[sql] object ParquetFilters {
   val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-  // set this to false if pushdown should be disabled
-  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
 
   def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
     val filters: Seq[CatalystFilter] = filterExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 74c43e0..5f93279 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -23,6 +23,8 @@ import java.text.SimpleDateFormat
 import java.util.concurrent.{Callable, TimeUnit}
 import java.util.{ArrayList, Collections, Date, List => JList}
 
+import org.apache.spark.annotation.DeveloperApi
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.util.Try
@@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
+ * :: DeveloperApi ::
  * Parquet table scan operator. Imports the file that backs the given
  * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
  */
@@ -108,15 +111,11 @@ case class ParquetTableScan(
     // Note 1: the input format ignores all predicates that cannot be expressed
     // as simple column predicate filters in Parquet. Here we just record
     // the whole pruning predicate.
-    // Note 2: you can disable filter predicate pushdown by setting
-    // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
-    if (columnPruningPred.length > 0 &&
-      sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
-      
+    if (columnPruningPred.length > 0) {
       // Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering
       val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred)
       if (filter != null){
-        val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate()
+        val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate
         ParquetInputFormat.setFilterPredicate(conf, filterPredicate)  
       }
     }
@@ -193,6 +192,7 @@ case class ParquetTableScan(
 }
 
 /**
+ * :: DeveloperApi ::
  * Operator that acts as a sink for queries on RDDs and can be used to
  * store the output inside a directory of Parquet files. This operator
  * is similar to Hive's INSERT INTO TABLE operation in the sense that
@@ -208,6 +208,7 @@ case class ParquetTableScan(
  * cause unpredicted behaviour and therefore results in a RuntimeException
  * (only detected via filename pattern so will not catch all cases).
  */
+@DeveloperApi
 case class InsertIntoParquetTable(
     relation: ParquetRelation,
     child: SparkPlan,

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3cccafe..80a3e0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     testRDD.registerTempTable("testsource")
     parquetFile(ParquetTestData.testFilterDir.toString)
       .registerTempTable("testfiltersource")
+
+    setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
   }
 
   override def afterAll() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org