You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/10/25 07:00:45 UTC

spark git commit: [SPARK-18026][SQL] should not always lowercase partition columns of partition spec in parser

Repository: spark
Updated Branches:
  refs/heads/master 78d740a08 -> 6f31833db


[SPARK-18026][SQL] should not always lowercase partition columns of partition spec in parser

## What changes were proposed in this pull request?

Currently we always lowercase the partition columns of partition spec in parser, with the assumption that table partition columns are always lowercased.

However, this is not true for data source tables, which are case preserving. It's safe for now because data source tables don't store partition spec in metastore and don't support `ADD PARTITION`, `DROP PARTITION`, `RENAME PARTITION`, but we should make our code future-proof.

This PR makes partition spec case preserving at parser, and improve the `PreprocessTableInsertion` analyzer rule to normalize the partition columns in partition spec, w.r.t. the table partition columns.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #15566 from cloud-fan/partition-spec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f31833d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f31833d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f31833d

Branch: refs/heads/master
Commit: 6f31833dbe0b766dfe4540a240fe92ebb7e14737
Parents: 78d740a
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Oct 25 15:00:33 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Oct 25 15:00:33 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  6 ++-
 .../plans/logical/basicLogicalOperators.scala   | 20 +---------
 .../spark/sql/execution/command/ddl.scala       | 34 ++++++++++++++--
 .../datasources/PartitioningUtils.scala         | 30 ++++++++++++++
 .../spark/sql/execution/datasources/rules.scala | 41 ++++++++++---------
 .../spark/sql/execution/command/DDLSuite.scala  | 42 ++++++++++++++++++++
 .../spark/sql/hive/client/HiveClientImpl.scala  |  3 ++
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 15 +------
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  5 +--
 9 files changed, 136 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 929c1c4..38e9bb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -192,11 +192,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
   override def visitPartitionSpec(
       ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
     val parts = ctx.partitionVal.asScala.map { pVal =>
-      val name = pVal.identifier.getText.toLowerCase
+      val name = pVal.identifier.getText
       val value = Option(pVal.constant).map(visitStringConstant)
       name -> value
     }
-    // Check for duplicate partition columns in one spec.
+    // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
+    // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
+    // partition columns will be done in analyzer.
     checkDuplicateKeys(parts, ctx)
     parts.toMap
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 64a787a..a48974c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -356,26 +356,10 @@ case class InsertIntoTable(
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty
 
-  lazy val expectedColumns = {
-    if (table.output.isEmpty) {
-      None
-    } else {
-      // Note: The parser (visitPartitionSpec in AstBuilder) already turns
-      // keys in partition to their lowercase forms.
-      val staticPartCols = partition.filter(_._2.isDefined).keySet
-      Some(table.output.filterNot(a => staticPartCols.contains(a.name)))
-    }
-  }
-
   assert(overwrite || !ifNotExists)
   assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
-  override lazy val resolved: Boolean =
-    childrenResolved && table.resolved && expectedColumns.forall { expected =>
-    child.output.size == expected.size && child.output.zip(expected).forall {
-      case (childAttr, tableAttr) =>
-        DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
-    }
-  }
+
+  override lazy val resolved: Boolean = childrenResolved && table.resolved
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 45fa293..15656fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -351,8 +351,13 @@ case class AlterTableAddPartitionCommand(
         "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
     }
     val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+      val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
+        spec,
+        table.partitionColumnNames,
+        table.identifier.quotedString,
+        sparkSession.sessionState.conf.resolver)
       // inherit table storage format (possibly except for location)
-      CatalogTablePartition(spec, table.storage.copy(locationUri = location))
+      CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location))
     }
     catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
     Seq.empty[Row]
@@ -382,8 +387,21 @@ case class AlterTableRenamePartitionCommand(
         "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
     }
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+
+    val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
+      oldPartition,
+      table.partitionColumnNames,
+      table.identifier.quotedString,
+      sparkSession.sessionState.conf.resolver)
+
+    val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec(
+      newPartition,
+      table.partitionColumnNames,
+      table.identifier.quotedString,
+      sparkSession.sessionState.conf.resolver)
+
     catalog.renamePartitions(
-      tableName, Seq(oldPartition), Seq(newPartition))
+      tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition))
     Seq.empty[Row]
   }
 
@@ -418,7 +436,17 @@ case class AlterTableDropPartitionCommand(
       throw new AnalysisException(
         "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
     }
-    catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
+
+    val normalizedSpecs = specs.map { spec =>
+      PartitioningUtils.normalizePartitionSpec(
+        spec,
+        table.partitionColumnNames,
+        table.identifier.quotedString,
+        sparkSession.sessionState.conf.resolver)
+    }
+
+    catalog.dropPartitions(
+      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 81bdabb..f66e8b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.Shell
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
 
@@ -244,6 +245,35 @@ object PartitioningUtils {
   }
 
   /**
+   * Normalize the column names in partition specification, w.r.t. the real partition column names
+   * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
+   * partition column named `month`, and it's case insensitive, we will normalize `monTh` to
+   * `month`.
+   */
+  def normalizePartitionSpec[T](
+      partitionSpec: Map[String, T],
+      partColNames: Seq[String],
+      tblName: String,
+      resolver: Resolver): Map[String, T] = {
+    val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
+      val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
+        throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
+      }
+      normalizedKey -> value
+    }
+
+    if (normalizedPartSpec.map(_._1).distinct.length != normalizedPartSpec.length) {
+      val duplicateColumns = normalizedPartSpec.map(_._1).groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => x
+      }
+      throw new AnalysisException(s"Found duplicated columns in partition specification: " +
+        duplicateColumns.mkString(", "))
+    }
+
+    normalizedPartSpec.toMap
+  }
+
+  /**
    * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
    * casting order is:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index bd6eb6e..cf501cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -187,8 +187,8 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       colName: String,
       colType: String): String = {
     val tableCols = schema.map(_.name)
-    val conf = sparkSession.sessionState.conf
-    tableCols.find(conf.resolver(_, colName)).getOrElse {
+    val resolver = sparkSession.sessionState.conf.resolver
+    tableCols.find(resolver(_, colName)).getOrElse {
       failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
         s"defined table columns are: ${tableCols.mkString(", ")}")
     }
@@ -209,42 +209,41 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
       tblName: String,
       partColNames: Seq[String]): InsertIntoTable = {
 
-    val expectedColumns = insert.expectedColumns
-    if (expectedColumns.isDefined && expectedColumns.get.length != insert.child.schema.length) {
+    val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
+      insert.partition, partColNames, tblName, conf.resolver)
+
+    val expectedColumns = {
+      val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
+      insert.table.output.filterNot(a => staticPartCols.contains(a.name))
+    }
+
+    if (expectedColumns.length != insert.child.schema.length) {
       throw new AnalysisException(
         s"Cannot insert into table $tblName because the number of columns are different: " +
-          s"need ${expectedColumns.get.length} columns, " +
+          s"need ${expectedColumns.length} columns, " +
           s"but query has ${insert.child.schema.length} columns.")
     }
 
-    if (insert.partition.nonEmpty) {
-      // the query's partitioning must match the table's partitioning
-      // this is set for queries like: insert into ... partition (one = "a", two = <expr>)
-      val samePartitionColumns =
-        if (conf.caseSensitiveAnalysis) {
-          insert.partition.keySet == partColNames.toSet
-        } else {
-          insert.partition.keySet.map(_.toLowerCase) == partColNames.map(_.toLowerCase).toSet
-        }
-      if (!samePartitionColumns) {
+    if (normalizedPartSpec.nonEmpty) {
+      if (normalizedPartSpec.size != partColNames.length) {
         throw new AnalysisException(
           s"""
              |Requested partitioning does not match the table $tblName:
-             |Requested partitions: ${insert.partition.keys.mkString(",")}
+             |Requested partitions: ${normalizedPartSpec.keys.mkString(",")}
              |Table partitions: ${partColNames.mkString(",")}
            """.stripMargin)
       }
-      expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
+
+      castAndRenameChildOutput(insert.copy(partition = normalizedPartSpec), expectedColumns)
     } else {
-      // All partition columns are dynamic because because the InsertIntoTable command does
+      // All partition columns are dynamic because the InsertIntoTable command does
       // not explicitly specify partitioning columns.
-      expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert)
+      castAndRenameChildOutput(insert, expectedColumns)
         .copy(partition = partColNames.map(_ -> None).toMap)
     }
   }
 
-  // TODO: do we really need to rename?
-  def castAndRenameChildOutput(
+  private def castAndRenameChildOutput(
       insert: InsertIntoTable,
       expectedOutput: Seq[Attribute]): InsertIntoTable = {
     val newChildOutput = expectedOutput.zip(insert.child.output).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d593bfb..de326f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -926,23 +926,33 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createPartitionedTable(tableIdent, isDatasourceTable = false)
+
+    // basic rename partition
     sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
     sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
       Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
     // rename without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
       Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
     // table to alter does not exist
     intercept[NoSuchTableException] {
       sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
     }
+
     // partition to rename does not exist
     intercept[NoSuchPartitionException] {
       sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
     }
+
+    // partition spec in RENAME PARTITION should be case insensitive by default
+    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
   }
 
   test("alter table: rename partition (datasource table)") {
@@ -1334,6 +1344,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val part2 = Map("a" -> "2", "b" -> "6")
     val part3 = Map("a" -> "3", "b" -> "7")
     val part4 = Map("a" -> "4", "b" -> "8")
+    val part5 = Map("a" -> "9", "b" -> "9")
     createDatabase(catalog, "dbx")
     createTable(catalog, tableIdent)
     createTablePartition(catalog, part1, tableIdent)
@@ -1341,6 +1352,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+
+    // basic add partition
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
         "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
@@ -1351,6 +1364,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
       assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
     }
+
     // add partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     maybeWrapException(isDatasourceTable) {
@@ -1360,14 +1374,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
         Set(part1, part2, part3, part4))
     }
+
     // table to alter does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
     }
+
     // partition to add already exists
     intercept[AnalysisException] {
       sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
     }
+
+    // partition to add already exists when using IF NOT EXISTS
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
     }
@@ -1375,6 +1393,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
         Set(part1, part2, part3, part4))
     }
+
+    // partition spec in ADD PARTITION should be case insensitive by default
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2, part3, part4, part5))
+    }
   }
 
   private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1395,12 +1422,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
+
+    // basic drop partition
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
     }
+
     // drop partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     maybeWrapException(isDatasourceTable) {
@@ -1409,20 +1439,32 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
     }
+
     // table to alter does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')")
     }
+
     // partition to drop does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE tab1 DROP PARTITION (a='300')")
     }
+
+    // partition to drop does not exist when using IF EXISTS
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
     }
     if (!isDatasourceTable) {
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
     }
+
+    // partition spec in DROP PARTITION should be case insensitive by default
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).isEmpty)
+    }
   }
 
   test("drop build-in function") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index e745a8c..8835b26 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -831,6 +831,9 @@ private[hive] class HiveClientImpl(
     new HivePartition(ht, tpart)
   }
 
+  // TODO (cloud-fan): the column names in partition specification are always lower cased because
+  // Hive metastore is not case preserving. We should normalize them to the actual column names of
+  // the table, once we store partition spec of data source tables.
   private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
     CatalogTablePartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index d9ce1c3..e3ddaf7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -370,17 +370,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
     assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
   }
 
-  test("InsertIntoTable#resolved should include dynamic partitions") {
-    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
-      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
-      val data = (1 to 10).map(i => (i.toLong, s"data-$i")).toDF("id", "data")
-
-      val logical = InsertIntoTable(spark.table("partitioned").logicalPlan,
-        Map("part" -> None), data.logicalPlan, overwrite = false, ifNotExists = false)
-      assert(!logical.resolved, "Should not resolve: missing partition data")
-    }
-  }
-
   testPartitionedTable(
     "SPARK-16036: better error message when insert into a table with mismatch schema") {
     tableName =>
@@ -409,8 +398,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
 
         sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
 
-        // c is defined twice. Parser will complain.
-        intercept[ParseException] {
+        // c is defined twice. Analyzer will complain.
+        intercept[AnalysisException] {
           sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6f31833d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 3d1712e..e9268a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -200,9 +200,8 @@ class HiveDDLSuite
         val message = intercept[AnalysisException] {
           sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
         }
-        assert(message.getMessage.contains(
-          "Partition spec is invalid. The spec (ds, unknowncol) must be contained within the " +
-            "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'"))
+        assert(message.getMessage.contains("unknownCol is not a valid partition column in table " +
+          "`default`.`exttable_with_partitions`"))
 
         sql(
           s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org