You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/25 02:25:03 UTC

[spark] branch master updated: [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b9370  [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly
f5b9370 is described below

commit f5b9370da2745a744f8b2f077f1690e0e7035140
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Jan 24 18:24:49 2019 -0800

    [SPARK-26709][SQL] OptimizeMetadataOnlyQuery does not handle empty records correctly
    
    ## What changes were proposed in this pull request?
    
    When reading from empty tables, the optimization `OptimizeMetadataOnlyQuery` may return wrong results:
    ```
    sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
    sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
    sql("SELECT MAX(p1) FROM t")
    ```
    The result is supposed to be `null`. However, with the optimization the result is `5`.
    
    The rule is originally ported from https://issues.apache.org/jira/browse/HIVE-1003 in #13494. In Hive, the rule is disabled by default in a later release(https://issues.apache.org/jira/browse/HIVE-15397), due to the same problem.
    
    It is hard to completely avoid the correctness issue. Because data sources like Parquet can be metadata-only. Spark can't tell whether it is empty or not without actually reading it. This PR disable the optimization by default.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23635 from gengliangwang/optimizeMetadata.
    
    Lead-authored-by: Gengliang Wang <ge...@databricks.com>
    Co-authored-by: Xiao Li <ga...@gmail.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 docs/sql-data-sources-parquet.md                   | 12 -------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 ++--
 .../sql/execution/OptimizeMetadataOnlyQuery.scala  |  5 +++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 37 ++++++++++++++++++++++
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 18 +++++++++++
 5 files changed, 64 insertions(+), 14 deletions(-)

diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md
index dcd2936..5532bf9 100644
--- a/docs/sql-data-sources-parquet.md
+++ b/docs/sql-data-sources-parquet.md
@@ -296,18 +296,6 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
   </td>
 </tr>
 <tr>
-  <td><code>spark.sql.optimizer.metadataOnly</code></td>
-  <td>true</td>
-  <td>
-    <p>
-      When true, enable the metadata-only query optimization that use the table's metadata to
-      produce the partition columns instead of table scans. It applies when all the columns scanned
-      are partition columns and the query has an aggregate operator that satisfies distinct
-      semantics.
-    </p>
-  </td>
-</tr>
-<tr>
   <td><code>spark.sql.parquet.writeLegacyFormat</code></td>
   <td>false</td>
   <td>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6b301c3..da595e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -582,12 +582,14 @@ object SQLConf {
     .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
 
   val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
+    .internal()
     .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
       "to produce the partition columns instead of table scans. It applies when all the columns " +
       "scanned are partition columns and the query has an aggregate operator that satisfies " +
-      "distinct semantics.")
+      "distinct semantics. By default the optimization is disabled, since it may return " +
+      "incorrect results when the files are empty.")
     .booleanConf
-    .createWithDefault(true)
+    .createWithDefault(false)
 
   val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
     .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 3ca03ab..45e5f41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -72,6 +72,11 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
             })
           }
           if (isAllDistinctAgg) {
+            logWarning("Since configuration `spark.sql.optimizer.metadataOnly` is enabled, " +
+              "Spark will scan partition-level metadata without scanning data files. " +
+              "This could result in wrong results when the partition metadata exists but the " +
+              "inclusive data files are empty."
+            )
             a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
           } else {
             a
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 806f0b2..b8c4d73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2966,6 +2966,43 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+    Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
+        withTable("t") {
+          sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+          sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+          if (enableOptimizeMetadataOnlyQuery) {
+            // The result is wrong if we enable the configuration.
+            checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
+          } else {
+            checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+          }
+          checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+        }
+
+        withTempPath { path =>
+          val tabLocation = path.getCanonicalPath
+          val partLocation1 = tabLocation + "/p=3"
+          val partLocation2 = tabLocation + "/p=1"
+          // SPARK-23271 empty RDD when saved should write a metadata only file
+          val df = spark.emptyDataFrame.select(lit(1).as("col"))
+          df.write.parquet(partLocation1)
+          val df2 = spark.range(10).toDF("col")
+          df2.write.parquet(partLocation2)
+          val readDF = spark.read.parquet(tabLocation)
+          if (enableOptimizeMetadataOnlyQuery) {
+            // The result is wrong if we enable the configuration.
+            checkAnswer(readDF.selectExpr("max(p)"), Row(3))
+          } else {
+            checkAnswer(readDF.selectExpr("max(p)"), Row(1))
+          }
+          checkAnswer(readDF.selectExpr("max(col)"), Row(9))
+        }
+      }
+    }
+  }
 }
 
 case class Foo(bar: Option[String])
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 70efad1..d506edc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2330,4 +2330,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
+    Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
+      withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString) {
+        withTable("t") {
+          sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
+          sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")
+          if (enableOptimizeMetadataOnlyQuery) {
+            // The result is wrong if we enable the configuration.
+            checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(5))
+          } else {
+            checkAnswer(sql("SELECT MAX(p1) FROM t"), Row(null))
+          }
+          checkAnswer(sql("SELECT MAX(col1) FROM t"), Row(null))
+        }
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org