You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/10/30 11:18:03 UTC

spark git commit: [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail

Repository: spark
Updated Branches:
  refs/heads/master 86d65265f -> 59db9e9c3


[SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail

When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).

For now, it just simply disables predicate push down when using merged schema in this PR.

Author: hyukjinkwon <gu...@gmail.com>

Closes #9327 from HyukjinKwon/SPARK-11103.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59db9e9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59db9e9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59db9e9c

Branch: refs/heads/master
Commit: 59db9e9c382fab40aac0633f2c779bee8cf2025f
Parents: 86d6526
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Oct 30 18:17:35 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Fri Oct 30 18:17:35 2015 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetRelation.scala   |  6 +++++-
 .../parquet/ParquetFilterSuite.scala            | 20 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59db9e9c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 77d851c..44649a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
 
+    // When merging schemas is enabled and the column of the given filter does not exist,
+    // Parquet emits an exception which is an issue of Parquet (PARQUET-389).
+    val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown
+
     // Parquet row group size. We will use this value as the value for
     // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
     // of these flags are smaller than the parquet row group size.
@@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
         dataSchema,
         parquetBlockSize,
         useMetadataCache,
-        parquetFilterPushDown,
+        safeParquetFilterPushDown,
         assumeBinaryIsString,
         assumeInt96IsTimestamp) _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59db9e9c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 13fdd55..b2101be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -316,4 +316,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        var pathOne = s"${dir.getCanonicalPath}/table1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
+        var pathTwo = s"${dir.getCanonicalPath}/table2"
+        (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
+
+        // If the "c = 1" filter gets pushed down, this query will throw an exception which
+        // Parquet emits. This is a Parquet issue (PARQUET-389).
+        checkAnswer(
+          sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
+          (1 to 1).map(i => Row(i, i.toString, null)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org