You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/11 04:57:18 UTC

[spark] branch branch-3.1 updated: [SPARK-33591][SQL][3.1] Recognize `null` in partition spec values

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2ebeaa1  [SPARK-33591][SQL][3.1] Recognize `null` in partition spec values
2ebeaa1 is described below

commit 2ebeaa15655841a4448fd8fca255dad79038222b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jan 11 04:56:39 2021 +0000

    [SPARK-33591][SQL][3.1] Recognize `null` in partition spec values
    
    ### What changes were proposed in this pull request?
    1. Recognize `null` while parsing partition specs, and put `null` instead of `"null"` as partition values.
    2. For V1 catalog: replace `null` by `__HIVE_DEFAULT_PARTITION__`.
    3. For V2 catalogs: pass `null` AS IS, and let catalog implementations to decide how to handle `null`s as partition values in spec.
    
    ### Why are the changes needed?
    Currently, `null` in partition specs is recognized as the `"null"` string which could lead to incorrect results, for example:
    ```sql
    spark-sql> CREATE TABLE tbl5 (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1);
    spark-sql> INSERT INTO TABLE tbl5 PARTITION (p1 = null) SELECT 0;
    spark-sql> SELECT isnull(p1) FROM tbl5;
    false
    ```
    Even we inserted a row to the partition with the `null` value, **the resulted table doesn't contain `null`**.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, the example above works as expected:
    ```sql
    spark-sql> SELECT isnull(p1) FROM tbl5;
    true
    ```
    
    ### How was this patch tested?
    By running the affected test suites `SQLQuerySuite`, `AlterTablePartitionV2SQLSuite` and `v1/ShowPartitionsSuite`.
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit 157b72ac9fa0057d5fd6d7ed52a6c4b22ebd1dfc)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31094 from MaxGekk/partition-spec-value-null-3.1.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/catalog/ExternalCatalogUtils.scala    | 10 +++++++
 .../sql/catalyst/catalog/InMemoryCatalog.scala     |  7 ++++-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  1 +
 .../spark/sql/execution/datasources/rules.scala    |  3 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  9 ++++++
 .../connector/AlterTablePartitionV2SQLSuite.scala  | 15 ++++++++++
 .../spark/sql/execution/command/DDLSuite.scala     |  8 ++++++
 .../execution/command/v1/ShowPartitionsSuite.scala | 12 ++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala       | 33 ++++++++++++----------
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  2 ++
 11 files changed, 84 insertions(+), 18 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 00445a1..9d6e0a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -161,6 +161,10 @@ object ExternalCatalogUtils {
     }
   }
 
+  private def isNullPartitionValue(value: String): Boolean = {
+    value == null || value == DEFAULT_PARTITION_NAME
+  }
+
   /**
    * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
    * partial partition spec w.r.t. PARTITION (a=1,b=2).
@@ -169,9 +173,15 @@ object ExternalCatalogUtils {
       spec1: TablePartitionSpec,
       spec2: TablePartitionSpec): Boolean = {
     spec1.forall {
+      case (partitionColumn, value) if isNullPartitionValue(value) =>
+        isNullPartitionValue(spec2(partitionColumn))
       case (partitionColumn, value) => spec2(partitionColumn) == value
     }
   }
+
+  def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = {
+    spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap
+  }
 }
 
 object CatalogUtils {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 31644a5..c059503 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -541,7 +541,12 @@ class InMemoryCatalog(
 
     listPartitions(db, table, partialSpec).map { partition =>
       partitionColumnNames.map { name =>
-        escapePathName(name) + "=" + escapePathName(partition.spec(name))
+        val partValue = if (partition.spec(name) == null) {
+          DEFAULT_PARTITION_NAME
+        } else {
+          escapePathName(partition.spec(name))
+        }
+        escapePathName(name) + "=" + partValue
       }.mkString("/")
     }.sorted
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 466a856..ab886d5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1185,7 +1185,7 @@ class SessionCatalog(
    */
   private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
     specs.foreach { s =>
-      if (s.values.exists(_.isEmpty)) {
+      if (s.values.exists(v => v != null && v.isEmpty)) {
         val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
         throw new AnalysisException(
           s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9d74ac9..34f56e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
    */
   protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) {
     ctx match {
+      case _: NullLiteralContext => null
       case s: StringLiteralContext => createString(s)
       case o => o.getText
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b9866e4..4fd6684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
       catalogTable.get.tracksPartitionsInCatalog
     if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
       // empty partition column value
-      if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
+      if (normalizedPartSpec.map(_._2)
+          .filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) {
         val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
         throw new AnalysisException(
           s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a003275..f6706f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3773,6 +3773,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       }
     }
   }
+
+  test("SPARK-33591: null as a partition value") {
+    val t = "part_table"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
+      sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
+      checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
+    }
+  }
 }
 
 case class Foo(bar: Option[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
index 45d47c6..9987043 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala
@@ -281,4 +281,19 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
       }
     }
   }
+
+  test("SPARK-33591: null as a partition value") {
+    val t = "testpart.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING foo PARTITIONED BY (p1)")
+      sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
+
+      val partTable = catalog("testpart").asTableCatalog
+        .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
+        .asPartitionable
+      assert(partTable.partitionExists(InternalRow(null)))
+      sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
+      assert(!partTable.partitionExists(InternalRow(null)))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index cfda1b8..cb25777 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1730,6 +1730,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     // use int literal as partition value for int type partition column
     sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")
     assert(catalog.listPartitions(tableIdent).isEmpty)
+
+    // null partition values
+    createTablePartition(catalog, Map("a" -> null, "b" -> null), tableIdent)
+    val nullPartValue = if (isUsingHiveMetastore) "__HIVE_DEFAULT_PARTITION__" else null
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> nullPartValue, "b" -> nullPartValue)))
+    sql("ALTER TABLE tab1 DROP PARTITION (a = null, b = null)")
+    assert(catalog.listPartitions(tableIdent).isEmpty)
   }
 
   protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
index c752a5f..bbca3e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
@@ -67,6 +67,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
       assert(errMsg.contains("'SHOW PARTITIONS' expects a table"))
     }
   }
+
+  test("SPARK-33591: null as a partition value") {
+    val t = "part_table"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
+      sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
+      checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__"))
+      checkAnswer(
+        sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
+        Row("p1=__HIVE_DEFAULT_PARTITION__"))
+    }
+  }
 }
 
 class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSession {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 54c237f..cc7aa14 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -950,9 +950,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Hive metastore is not case preserving and the partition columns are always lower cased. We need
   // to lower case the column names in partition specification before calling partition related Hive
   // APIs, to match this behaviour.
-  private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+  private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
     // scalastyle:off caselocale
-    spec.map { case (k, v) => k.toLowerCase -> v }
+    val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
+    ExternalCatalogUtils.convertNullPartitionValues(lowNames)
     // scalastyle:on caselocale
   }
 
@@ -1001,8 +1002,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       }
       p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
     }
-    val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
-    client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
+    val metaStoreParts = partsWithLocation
+      .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
+    client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
   }
 
   override def dropPartitions(
@@ -1014,7 +1016,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       retainData: Boolean): Unit = withClient {
     requireTableExists(db, table)
     client.dropPartitions(
-      db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
+      db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData)
   }
 
   override def renamePartitions(
@@ -1023,7 +1025,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
     client.renamePartitions(
-      db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
+      db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec))
 
     val tableMeta = getTable(db, table)
     val partitionColumnNames = tableMeta.partitionColumnNames
@@ -1039,7 +1041,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       val fs = tablePath.getFileSystem(hadoopConf)
       val newParts = newSpecs.map { spec =>
         val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
-        val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+        val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
         partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
       }
       alterPartitions(db, table, newParts)
@@ -1149,12 +1151,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
-    val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
 
     val rawTable = getRawTable(db, table)
 
     // convert partition statistics to properties so that we can persist them through hive api
-    val withStatsProps = lowerCasedParts.map { p =>
+    val withStatsProps = metaStoreParts.map { p =>
       if (p.stats.isDefined) {
         val statsProperties = statsToProperties(p.stats.get)
         p.copy(parameters = p.parameters ++ statsProperties)
@@ -1170,7 +1172,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
-    val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+    val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
     restorePartitionMetadata(part, getTable(db, table))
   }
 
@@ -1208,7 +1210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
-    client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+    client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part =>
       restorePartitionMetadata(part, getTable(db, table))
     }
   }
@@ -1223,7 +1225,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val catalogTable = getTable(db, table)
     val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
     val clientPartitionNames =
-      client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
+      client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec))
     clientPartitionNames.map { partitionPath =>
       val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
       partSpec.map { case (partName, partValue) =>
@@ -1242,11 +1244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
     val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
-    val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
-      part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
+    val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
+    val res = client.getPartitions(db, table, metaStoreSpec)
+      .map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
     }
 
-    partialSpec match {
+    metaStoreSpec match {
       // This might be a bug of Hive: When the partition value inside the partial partition spec
       // contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
       // treats dot as matching any single character and may return more partitions than we
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3c3f31a..61aef24 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -133,6 +133,7 @@ case class InsertIntoHiveTable(
     val numDynamicPartitions = partition.values.count(_.isEmpty)
     val numStaticPartitions = partition.values.count(_.nonEmpty)
     val partitionSpec = partition.map {
+      case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
       case (key, Some(value)) => key -> value
       case (key, None) => key -> ""
     }
@@ -229,6 +230,7 @@ case class InsertIntoHiveTable(
             val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)
 
             val updatedPartitionSpec = partition.map {
+              case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
               case (key, Some(value)) => key -> value
               case (key, None) if caseInsensitiveDpMap.contains(key) =>
                 key -> caseInsensitiveDpMap(key)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org