You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/02 06:02:48 UTC

spark git commit: [SPARK-17213][SQL] Disable Parquet filter push-down for string and binary columns due to PARQUET-686

Repository: spark
Updated Branches:
  refs/heads/master c82f16c15 -> ca6391637


[SPARK-17213][SQL] Disable Parquet filter push-down for string and binary columns due to PARQUET-686

This PR targets to both master and branch-2.1.

## What changes were proposed in this pull request?

Due to PARQUET-686, Parquet doesn't do string comparison correctly while doing filter push-down for string columns. This PR disables filter push-down for both string and binary columns to work around this issue. Binary columns are also affected because some Parquet data models (like Hive) may store string columns as a plain Parquet `binary` instead of a `binary (UTF8)`.

## How was this patch tested?

New test case added in `ParquetFilterSuite`.

Author: Cheng Lian <li...@databricks.com>

Closes #16106 from liancheng/spark-17213-bad-string-ppd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca639163
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca639163
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca639163

Branch: refs/heads/master
Commit: ca6391637212814b7c0bd14c434a6737da17b258
Parents: c82f16c
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Dec 1 22:02:45 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Dec 1 22:02:45 2016 -0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFilters.scala    | 24 ++++++++++++++++++
 .../parquet/ParquetFilterSuite.scala            | 26 +++++++++++++++++---
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca639163/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index a6e9788..7730d1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -40,6 +40,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     // Binary.fromString and Binary.fromByteArray don't accept null values
     case StringType =>
       (n: String, v: Any) => FilterApi.eq(
@@ -49,6 +52,7 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.eq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+     */
   }
 
   private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -62,6 +66,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     case StringType =>
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
@@ -70,6 +77,7 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.notEq(
         binaryColumn(n),
         Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+     */
   }
 
   private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -81,6 +89,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n),
@@ -88,6 +99,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -99,6 +111,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n),
@@ -106,6 +121,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -117,6 +133,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n),
@@ -124,6 +143,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -135,6 +155,9 @@ private[parquet] object ParquetFilters {
       (n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
     case DoubleType =>
       (n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+
+    // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+    /*
     case StringType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n),
@@ -142,6 +165,7 @@ private[parquet] object ParquetFilters {
     case BinaryType =>
       (n: String, v: Any) =>
         FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+     */
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ca639163/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 4246b54..a0d57d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -47,7 +47,6 @@ import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
  *    data type is nullable.
  */
 class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
-
   private def checkFilterPredicate(
       df: DataFrame,
       predicate: Predicate,
@@ -230,7 +229,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
-  test("filter pushdown - string") {
+  // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+  ignore("filter pushdown - string") {
     withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
       checkFilterPredicate(
@@ -258,7 +258,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 
-  test("filter pushdown - binary") {
+  // See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
+  ignore("filter pushdown - binary") {
     implicit class IntToBinary(int: Int) {
       def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
     }
@@ -558,4 +559,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-17213: Broken Parquet filter push-down for string columns") {
+    withTempPath { dir =>
+      import testImplicits._
+
+      val path = dir.getCanonicalPath
+      // scalastyle:off nonascii
+      Seq("a", "�").toDF("name").write.parquet(path)
+      // scalastyle:on nonascii
+
+      assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
+      assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
+
+      // scalastyle:off nonascii
+      assert(spark.read.parquet(path).where("name < '�'").count() == 1)
+      assert(spark.read.parquet(path).where("name <= '�'").count() == 2)
+      // scalastyle:on nonascii
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org