You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/17 13:53:45 UTC

spark git commit: [SPARK-21783][SQL] Turn on ORC filter push-down by default

Repository: spark
Updated Branches:
  refs/heads/master 1f3d933e0 -> 0f8a28617


[SPARK-21783][SQL] Turn on ORC filter push-down by default

## What changes were proposed in this pull request?

ORC filter push-down is disabled by default from the beginning, [SPARK-2883](https://github.com/apache/spark/commit/aa31e431fc09f0477f1c2351c6275769a31aca90#diff-41ef65b9ef5b518f77e2a03559893f4dR149
).

Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet ([SPARK-9207](https://issues.apache.org/jira/browse/SPARK-21783)) as a part of [SPARK-20901](https://issues.apache.org/jira/browse/SPARK-20901), "Feature parity for ORC with Parquet".

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <do...@apache.org>

Closes #20265 from dongjoon-hyun/SPARK-21783.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f8a2861
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f8a2861
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f8a2861

Branch: refs/heads/master
Commit: 0f8a28617a0742d5a99debfbae91222c2e3b5cec
Parents: 1f3d933
Author: Dongjoon Hyun <do...@apache.org>
Authored: Wed Jan 17 21:53:36 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 17 21:53:36 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   2 +-
 .../spark/sql/FilterPushdownBenchmark.scala     | 243 +++++++++++++++++++
 2 files changed, 244 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f8a2861/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6746fbc..16fbb0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -410,7 +410,7 @@ object SQLConf {
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf
-    .createWithDefault(false)
+    .createWithDefault(true)
 
   val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
     .doc("When true, check all the partition paths under the table\'s root directory " +

http://git-wip-us.apache.org/repos/asf/spark/blob/0f8a2861/sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala
new file mode 100644
index 0000000..c6dd7da
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FilterPushdownBenchmark.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder()
+    .master("local[1]")
+    .appName("FilterPushdownBenchmark")
+    .config(conf)
+    .getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+    val path = Utils.createTempDir()
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+    (keys, values).zipped.foreach(spark.conf.set)
+    try f finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => spark.conf.set(key, value)
+        case (key, None) => spark.conf.unset(key)
+      }
+    }
+  }
+
+  private def prepareTable(dir: File, numRows: Int, width: Int): Unit = {
+    import spark.implicits._
+    val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+    val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*)
+      .withColumn("id", monotonically_increasing_id())
+
+    val dirORC = dir.getCanonicalPath + "/orc"
+    val dirParquet = dir.getCanonicalPath + "/parquet"
+
+    df.write.mode("overwrite").orc(dirORC)
+    df.write.mode("overwrite").parquet(dirParquet)
+
+    spark.read.orc(dirORC).createOrReplaceTempView("orcTable")
+    spark.read.parquet(dirParquet).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(
+      values: Int,
+      title: String,
+      whereExpr: String,
+      selectExpr: String = "*"): Unit = {
+    val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+    Seq(false, true).foreach { pushDownEnabled =>
+      val name = s"Parquet Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
+      benchmark.addCase(name) { _ =>
+        withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") {
+          spark.sql(s"SELECT $selectExpr FROM parquetTable WHERE $whereExpr").collect()
+        }
+      }
+    }
+
+    Seq(false, true).foreach { pushDownEnabled =>
+      val name = s"Native ORC Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
+      benchmark.addCase(name) { _ =>
+        withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> s"$pushDownEnabled") {
+          spark.sql(s"SELECT $selectExpr FROM orcTable WHERE $whereExpr").collect()
+        }
+      }
+    }
+
+    /*
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
+    Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+    Select 0 row (id IS NULL):              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7882 / 7957          2.0         501.1       1.0X
+    Parquet Vectorized (Pushdown)                   55 /   60        285.2           3.5     142.9X
+    Native ORC Vectorized                         5592 / 5627          2.8         355.5       1.4X
+    Native ORC Vectorized (Pushdown)                66 /   70        237.2           4.2     118.9X
+
+    Select 0 row (7864320 < id < 7864320):  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7884 / 7909          2.0         501.2       1.0X
+    Parquet Vectorized (Pushdown)                  739 /  752         21.3          47.0      10.7X
+    Native ORC Vectorized                         5614 / 5646          2.8         356.9       1.4X
+    Native ORC Vectorized (Pushdown)                81 /   83        195.2           5.1      97.8X
+
+    Select 1 row (id = 7864320):            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7905 / 8027          2.0         502.6       1.0X
+    Parquet Vectorized (Pushdown)                  740 /  766         21.2          47.1      10.7X
+    Native ORC Vectorized                         5684 / 5738          2.8         361.4       1.4X
+    Native ORC Vectorized (Pushdown)                78 /   81        202.4           4.9     101.7X
+
+    Select 1 row (id <=> 7864320):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7928 / 7993          2.0         504.1       1.0X
+    Parquet Vectorized (Pushdown)                  747 /  772         21.0          47.5      10.6X
+    Native ORC Vectorized                         5728 / 5753          2.7         364.2       1.4X
+    Native ORC Vectorized (Pushdown)                76 /   78        207.9           4.8     104.8X
+
+    Select 1 row (7864320 <= id <= 7864320):Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7939 / 8021          2.0         504.8       1.0X
+    Parquet Vectorized (Pushdown)                  746 /  770         21.1          47.4      10.6X
+    Native ORC Vectorized                         5690 / 5734          2.8         361.7       1.4X
+    Native ORC Vectorized (Pushdown)                76 /   79        206.7           4.8     104.3X
+
+    Select 1 row (7864319 < id < 7864321):  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            7972 / 8019          2.0         506.9       1.0X
+    Parquet Vectorized (Pushdown)                  742 /  764         21.2          47.2      10.7X
+    Native ORC Vectorized                         5704 / 5743          2.8         362.6       1.4X
+    Native ORC Vectorized (Pushdown)                76 /   78        207.9           4.8     105.4X
+
+    Select 10% rows (id < 1572864):         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                            8733 / 8808          1.8         555.2       1.0X
+    Parquet Vectorized (Pushdown)                 2213 / 2267          7.1         140.7       3.9X
+    Native ORC Vectorized                         6420 / 6463          2.4         408.2       1.4X
+    Native ORC Vectorized (Pushdown)              1313 / 1331         12.0          83.5       6.7X
+
+    Select 50% rows (id < 7864320):         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                          11518 / 11591          1.4         732.3       1.0X
+    Parquet Vectorized (Pushdown)                 7962 / 7991          2.0         506.2       1.4X
+    Native ORC Vectorized                         8927 / 8985          1.8         567.6       1.3X
+    Native ORC Vectorized (Pushdown)              6102 / 6160          2.6         387.9       1.9X
+
+    Select 90% rows (id < 14155776):        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                          14255 / 14389          1.1         906.3       1.0X
+    Parquet Vectorized (Pushdown)               13564 / 13594          1.2         862.4       1.1X
+    Native ORC Vectorized                       11442 / 11608          1.4         727.5       1.2X
+    Native ORC Vectorized (Pushdown)            10991 / 11029          1.4         698.8       1.3X
+
+    Select all rows (id IS NOT NULL):       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                          14917 / 14938          1.1         948.4       1.0X
+    Parquet Vectorized (Pushdown)               14910 / 14964          1.1         948.0       1.0X
+    Native ORC Vectorized                       11986 / 12069          1.3         762.0       1.2X
+    Native ORC Vectorized (Pushdown)            12037 / 12123          1.3         765.3       1.2X
+
+    Select all rows (id > -1):              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                          14951 / 14976          1.1         950.6       1.0X
+    Parquet Vectorized (Pushdown)               14934 / 15016          1.1         949.5       1.0X
+    Native ORC Vectorized                       12000 / 12156          1.3         763.0       1.2X
+    Native ORC Vectorized (Pushdown)            12079 / 12113          1.3         767.9       1.2X
+
+    Select all rows (id != -1):             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -----------------------------------------------------------------------------------------------
+    Parquet Vectorized                          14930 / 14972          1.1         949.3       1.0X
+    Parquet Vectorized (Pushdown)               15015 / 15047          1.0         954.6       1.0X
+    Native ORC Vectorized                       12090 / 12259          1.3         768.7       1.2X
+    Native ORC Vectorized (Pushdown)            12021 / 12096          1.3         764.2       1.2X
+    */
+    benchmark.run()
+  }
+
+  def main(args: Array[String]): Unit = {
+    val numRows = 1024 * 1024 * 15
+    val width = 5
+    val mid = numRows / 2
+
+    withTempPath { dir =>
+      withTempTable("orcTable", "patquetTable") {
+        prepareTable(dir, numRows, width)
+
+        Seq("id IS NULL", s"$mid < id AND id < $mid").foreach { whereExpr =>
+          val title = s"Select 0 row ($whereExpr)".replace("id AND id", "id")
+          filterPushDownBenchmark(numRows, title, whereExpr)
+        }
+
+        Seq(
+          s"id = $mid",
+          s"id <=> $mid",
+          s"$mid <= id AND id <= $mid",
+          s"${mid - 1} < id AND id < ${mid + 1}"
+        ).foreach { whereExpr =>
+          val title = s"Select 1 row ($whereExpr)".replace("id AND id", "id")
+          filterPushDownBenchmark(numRows, title, whereExpr)
+        }
+
+        val selectExpr = (1 to width).map(i => s"MAX(c$i)").mkString("", ",", ", MAX(id)")
+
+        Seq(10, 50, 90).foreach { percent =>
+          filterPushDownBenchmark(
+            numRows,
+            s"Select $percent% rows (id < ${numRows * percent / 100})",
+            s"id < ${numRows * percent / 100}",
+            selectExpr
+          )
+        }
+
+        Seq("id IS NOT NULL", "id > -1", "id != -1").foreach { whereExpr =>
+          filterPushDownBenchmark(
+            numRows,
+            s"Select all rows ($whereExpr)",
+            whereExpr,
+            selectExpr)
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org