You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/17 18:01:35 UTC

spark git commit: [SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in partition spec

Repository: spark
Updated Branches:
  refs/heads/master a83accfcf -> a23debd7b


[SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in partition spec

### What changes were proposed in this pull request?
Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error.

```Scala
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
spark.sql("alter table partitionedTable drop partition(partCol1='')")
spark.table("partitionedTable").show()
```

In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values.

When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs.

### How was this patch tested?
Added test cases

Author: gatorsmile <ga...@gmail.com>

Closes #16583 from gatorsmile/disallowEmptyPartColValue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a23debd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a23debd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a23debd7

Branch: refs/heads/master
Commit: a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff
Parents: a83accf
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Jan 18 02:01:30 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 18 02:01:30 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 26 +++++++-
 .../catalyst/catalog/ExternalCatalogSuite.scala |  2 +
 .../catalyst/catalog/SessionCatalogSuite.scala  | 70 ++++++++++++++++++--
 .../spark/sql/hive/client/HiveClientImpl.scala  |  6 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 10 +++
 5 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a23debd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 12af9e0..8008fcd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -331,7 +331,7 @@ class SessionCatalog(
   def loadPartition(
       name: TableIdentifier,
       loadPath: String,
-      partition: TablePartitionSpec,
+      spec: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
@@ -340,8 +340,9 @@ class SessionCatalog(
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.loadPartition(
-      db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
+      db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): String = {
@@ -693,6 +694,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
   }
 
@@ -711,6 +713,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(specs)
     externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
   }
 
@@ -731,6 +734,8 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(specs, tableMetadata)
     requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
+    requireNonEmptyValueInPartitionSpec(specs)
+    requireNonEmptyValueInPartitionSpec(newSpecs)
     externalCatalog.renamePartitions(db, table, specs, newSpecs)
   }
 
@@ -749,6 +754,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.alterPartitions(db, table, parts)
   }
 
@@ -762,6 +768,7 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+    requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.getPartition(db, table, spec)
   }
 
@@ -781,6 +788,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitionNames(db, table, partialSpec)
   }
@@ -801,6 +809,7 @@ class SessionCatalog(
     requireTableExists(TableIdentifier(table, Option(db)))
     partialSpec.foreach { spec =>
       requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
     externalCatalog.listPartitions(db, table, partialSpec)
   }
@@ -820,6 +829,19 @@ class SessionCatalog(
   }
 
   /**
+   * Verify if the input partition spec has any empty value.
+   */
+  private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
+    specs.foreach { s =>
+      if (s.values.exists(_.isEmpty)) {
+        val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+        throw new AnalysisException(
+          s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
+      }
+    }
+  }
+
+  /**
    * Verify if the input partition spec exactly matches the existing defined partition spec
    * The columns must be the same but the orders could be different.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a23debd7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 91f464b..acf3bcf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
     CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
   lazy val partWithUnknownColumns =
     CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
+  lazy val partWithEmptyValue =
+    CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
   lazy val funcClass = "org.apache.spark.myFunc"
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a23debd7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ae93dff..7a7de25 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("drop partitions") {
@@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, unknown) must be contained within " +
         "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(partWithEmptyValue.spec, part1.spec),
+        ignoreIfNotExists = false,
+        purge = false,
+        retainData = false)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("get partition") {
@@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("rename partitions") {
@@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("db2")),
+        Seq(part1.spec), Seq(partWithEmptyValue.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("alter partitions") {
@@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
     }
     assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
       "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+    e = intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partition names") {
@@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {
 
   test("list partition names with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
+    var e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
       catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(Map("unknown" -> "unknown")))
+        Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions") {
@@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {
 
   test("list partitions with invalid partial partition spec") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
-      catalog.listPartitions(
-        TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+    var e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+        Some(partWithUnknownColumns.spec))
+    }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+    e = intercept[AnalysisException] {
+      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
     }
+    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+      "empty partition column value"))
   }
 
   test("list partitions when database/table does not exist") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a23debd7/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5c0e2f6..9a6144c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -471,6 +471,7 @@ private[hive] class HiveClientImpl(
     // do the check at first and collect all the matching partitions
     val matchingParts =
       specs.flatMap { s =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
         // The provided spec here can be a partial spec, i.e. it will match all partitions
         // whose specs are supersets of this partial spec. E.g. If a table has partitions
         // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
@@ -545,6 +546,7 @@ private[hive] class HiveClientImpl(
           // -1 for result limit means "no limit/return all"
           client.getPartitionNames(table.database, table.identifier.table, -1)
         case Some(s) =>
+          assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
           client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
       }
     hivePartitionNames.asScala.sorted
@@ -568,7 +570,9 @@ private[hive] class HiveClientImpl(
     val hiveTable = toHiveTable(table)
     val parts = spec match {
       case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
-      case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+      case Some(s) =>
+        assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
+        client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
     }
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts

http://git-wip-us.apache.org/repos/asf/spark/blob/a23debd7/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e3f1667..ef62be3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -247,6 +247,16 @@ class HiveDDLSuite
     }
   }
 
+  test("SPARK-19129: drop partition with a empty string will drop the whole table") {
+    val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
+    df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
+    val e = intercept[AnalysisException] {
+      spark.sql("alter table partitionedTable drop partition(partCol1='')")
+    }.getMessage
+    assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
+      "partition column value"))
+  }
+
   test("add/drop partitions - external table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org