You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/12 05:59:51 UTC

spark git commit: [SPARK-14132][SPARK-14133][SQL] Alter table partition DDLs

Repository: spark
Updated Branches:
  refs/heads/master e9e1adc03 -> 83fb96403


[SPARK-14132][SPARK-14133][SQL] Alter table partition DDLs

## What changes were proposed in this pull request?

This implements a few alter table partition commands using the `SessionCatalog`. In particular:
```
ALTER TABLE ... ADD PARTITION ...
ALTER TABLE ... DROP PARTITION ...
ALTER TABLE ... RENAME PARTITION ... TO ...
```
The following operations are not supported, and an `AnalysisException` with a helpful error message will be thrown if the user tries to use them:
```
ALTER TABLE ... EXCHANGE PARTITION ...
ALTER TABLE ... ARCHIVE PARTITION ...
ALTER TABLE ... UNARCHIVE PARTITION ...
ALTER TABLE ... TOUCH ...
ALTER TABLE ... COMPACT ...
ALTER TABLE ... CONCATENATE
MSCK REPAIR TABLE ...
```

## How was this patch tested?

`DDLSuite`, `DDLCommandSuite` and `HiveDDLCommandSuite`

Author: Andrew Or <an...@databricks.com>

Closes #12220 from andrewor14/alter-partition-ddl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83fb9640
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83fb9640
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83fb9640

Branch: refs/heads/master
Commit: 83fb96403bcfb1566e9d765690744824724737ac
Parents: e9e1adc
Author: Andrew Or <an...@databricks.com>
Authored: Mon Apr 11 20:59:45 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Apr 11 20:59:45 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   6 +-
 .../sql/catalyst/catalog/CatalogTestCases.scala |  18 ++-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  28 +++-
 .../spark/sql/execution/SparkSqlParser.scala    |  56 +++----
 .../spark/sql/execution/command/ddl.scala       | 104 +++++++-----
 .../sql/execution/command/DDLCommandSuite.scala | 118 +++++---------
 .../spark/sql/execution/command/DDLSuite.scala  | 162 ++++++++++++++++++-
 .../hive/execution/HiveCompatibilitySuite.scala |  10 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  21 +--
 .../spark/sql/hive/client/HiveClient.scala      |   9 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  22 ++-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  12 ++
 12 files changed, 360 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 2f2e060..0e2cd39 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -148,7 +148,7 @@ hiveNativeCommands
     | ROLLBACK WORK?
     | SHOW PARTITIONS tableIdentifier partitionSpec?
     | DFS .*?
-    | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | MSCK | LOAD) .*?
+    | (CREATE | ALTER | DROP | SHOW | DESC | DESCRIBE | LOAD) .*?
     ;
 
 unsupportedHiveNativeCommands
@@ -177,6 +177,7 @@ unsupportedHiveNativeCommands
     | kw1=UNLOCK kw2=DATABASE
     | kw1=CREATE kw2=TEMPORARY kw3=MACRO
     | kw1=DROP kw2=TEMPORARY kw3=MACRO
+    | kw1=MSCK kw2=REPAIR kw3=TABLE
     ;
 
 createTableHeader
@@ -651,7 +652,7 @@ nonReserved
     | AFTER | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
     | INPUTDRIVER | OUTPUTDRIVER | DBPROPERTIES | DFS | TRUNCATE | METADATA | REPLICATION | COMPUTE
     | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
-    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
+    | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
     | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION
     ;
 
@@ -867,6 +868,7 @@ GRANT: 'GRANT';
 LOCK: 'LOCK';
 UNLOCK: 'UNLOCK';
 MSCK: 'MSCK';
+REPAIR: 'REPAIR';
 EXPORT: 'EXPORT';
 IMPORT: 'IMPORT';
 LOAD: 'LOAD';

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 0009438..0d9b085 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -281,31 +281,37 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
   test("drop partitions") {
     val catalog = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
-    catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+    catalog.dropPartitions(
+      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
     resetState()
     val catalog2 = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
-    catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+    catalog2.dropPartitions(
+      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
     assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
   }
 
   test("drop partitions when database/table does not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+      catalog.dropPartitions(
+        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
     }
     intercept[AnalysisException] {
-      catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+      catalog.dropPartitions(
+        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
     }
   }
 
   test("drop partitions that do not exist") {
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
-      catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+      catalog.dropPartitions(
+        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
     }
-    catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+    catalog.dropPartitions(
+      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
   }
 
   test("get partition") {

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 862fc27..426273e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -496,19 +496,25 @@ class SessionCatalogSuite extends SparkFunSuite {
     val sessionCatalog = new SessionCatalog(externalCatalog)
     assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
     sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false)
+      TableIdentifier("tbl2", Some("db2")),
+      Seq(part1.spec),
+      ignoreIfNotExists = false)
     assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
     // Drop partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
     sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false)
+      TableIdentifier("tbl2"),
+      Seq(part2.spec),
+      ignoreIfNotExists = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
     // Drop multiple partitions at once
     sessionCatalog.createPartitions(
       TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
     assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
     sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+      TableIdentifier("tbl2", Some("db2")),
+      Seq(part1.spec, part2.spec),
+      ignoreIfNotExists = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
   }
 
@@ -516,11 +522,15 @@ class SessionCatalogSuite extends SparkFunSuite {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false)
+        TableIdentifier("tbl1", Some("does_not_exist")),
+        Seq(),
+        ignoreIfNotExists = false)
     }
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false)
+        TableIdentifier("does_not_exist", Some("db2")),
+        Seq(),
+        ignoreIfNotExists = false)
     }
   }
 
@@ -528,10 +538,14 @@ class SessionCatalogSuite extends SparkFunSuite {
     val catalog = new SessionCatalog(newBasicCatalog())
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false)
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(part3.spec),
+        ignoreIfNotExists = false)
     }
     catalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true)
+      TableIdentifier("tbl2", Some("db2")),
+      Seq(part3.spec),
+      ignoreIfNotExists = true)
   }
 
   test("get partition") {

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 3da715c..73d9640 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -493,7 +493,9 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitAddTablePartition(
       ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
+    if (ctx.VIEW != null) {
+      throw new AnalysisException(s"Operation not allowed: partitioned views")
+    }
     // Create partition spec to location mapping.
     val specsAndLocs = if (ctx.partitionSpec.isEmpty) {
       ctx.partitionSpecLocation.asScala.map {
@@ -509,8 +511,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     AlterTableAddPartition(
       visitTableIdentifier(ctx.tableIdentifier),
       specsAndLocs,
-      ctx.EXISTS != null)(
-      command(ctx))
+      ctx.EXISTS != null)
   }
 
   /**
@@ -523,11 +524,8 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitExchangeTablePartition(
       ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableExchangePartition(
-      visitTableIdentifier(ctx.from),
-      visitTableIdentifier(ctx.to),
-      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... EXCHANGE PARTITION ...")
   }
 
   /**
@@ -543,8 +541,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     AlterTableRenamePartition(
       visitTableIdentifier(ctx.tableIdentifier),
       visitNonOptionalPartitionSpec(ctx.from),
-      visitNonOptionalPartitionSpec(ctx.to))(
-      command(ctx))
+      visitNonOptionalPartitionSpec(ctx.to))
   }
 
   /**
@@ -561,13 +558,16 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitDropTablePartitions(
       ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
-    if (ctx.VIEW != null) throw new ParseException(s"Operation not allowed: partitioned views", ctx)
+    if (ctx.VIEW != null) {
+      throw new AnalysisException(s"Operation not allowed: partitioned views")
+    }
+    if (ctx.PURGE != null) {
+      throw new AnalysisException(s"Operation not allowed: PURGE")
+    }
     AlterTableDropPartition(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
-      ctx.EXISTS != null,
-      ctx.PURGE != null)(
-      command(ctx))
+      ctx.EXISTS != null)
   }
 
   /**
@@ -580,10 +580,8 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitArchiveTablePartition(
       ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableArchivePartition(
-      visitTableIdentifier(ctx.tableIdentifier),
-      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... ARCHIVE PARTITION ...")
   }
 
   /**
@@ -596,10 +594,8 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitUnarchiveTablePartition(
       ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableUnarchivePartition(
-      visitTableIdentifier(ctx.tableIdentifier),
-      visitNonOptionalPartitionSpec(ctx.partitionSpec))(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... UNARCHIVE PARTITION ...")
   }
 
   /**
@@ -658,10 +654,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableTouch(
-      visitTableIdentifier(ctx.tableIdentifier),
-      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
-      command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... TOUCH ...")
   }
 
   /**
@@ -673,11 +666,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableCompact(
-      visitTableIdentifier(ctx.tableIdentifier),
-      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
-      string(ctx.STRING))(
-      command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... COMPACT ...")
   }
 
   /**
@@ -689,10 +678,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableMerge(
-      visitTableIdentifier(ctx.tableIdentifier),
-      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
-      command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... CONCATENATE")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 8a37cf8..c55b1a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.types._
@@ -348,53 +349,94 @@ case class AlterTableSerDeProperties(
 }
 
 /**
- * Add Partition in ALTER TABLE/VIEW: add the table/view partitions.
+ * Add Partition in ALTER TABLE: add the table partitions.
+ *
  * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
  * EXCEPT that it is ILLEGAL to specify a LOCATION clause.
  * An error message will be issued if the partition exists, unless 'ifNotExists' is true.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ * }}}
  */
 case class AlterTableAddPartition(
     tableName: TableIdentifier,
     partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
-    ifNotExists: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    ifNotExists: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTableMetadata(tableName)
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table add partition is not allowed for tables defined using the datasource API")
+    }
+    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
+      // inherit table storage format (possibly except for location)
+      CatalogTablePartition(spec, table.storage.copy(locationUri = location))
+    }
+    catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
+    Seq.empty[Row]
+  }
 
+}
+
+/**
+ * Alter a table partition's spec.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
+ * }}}
+ */
 case class AlterTableRenamePartition(
     tableName: TableIdentifier,
     oldPartition: TablePartitionSpec,
-    newPartition: TablePartitionSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    newPartition: TablePartitionSpec)
+  extends RunnableCommand {
 
-case class AlterTableExchangePartition(
-    fromTableName: TableIdentifier,
-    toTableName: TableIdentifier,
-    spec: TablePartitionSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.sessionState.catalog.renamePartitions(
+      tableName, Seq(oldPartition), Seq(newPartition))
+    Seq.empty[Row]
+  }
+
+}
 
 /**
- * Drop Partition in ALTER TABLE/VIEW: to drop a particular partition for a table/view.
+ * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
+ *
  * This removes the data and metadata for this partition.
  * The data is actually moved to the .Trash/Current directory if Trash is configured,
  * unless 'purge' is true, but the metadata is completely lost.
  * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
  * Note: purge is always false when the target is a view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
+ * }}}
  */
 case class AlterTableDropPartition(
     tableName: TableIdentifier,
     specs: Seq[TablePartitionSpec],
-    ifExists: Boolean,
-    purge: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    ifExists: Boolean)
+  extends RunnableCommand {
 
-case class AlterTableArchivePartition(
-    tableName: TableIdentifier,
-    spec: TablePartitionSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTableMetadata(tableName)
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table drop partition is not allowed for tables defined using the datasource API")
+    }
+    catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
+    Seq.empty[Row]
+  }
 
-case class AlterTableUnarchivePartition(
-    tableName: TableIdentifier,
-    spec: TablePartitionSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+}
 
 case class AlterTableSetFileFormat(
     tableName: TableIdentifier,
@@ -453,22 +495,6 @@ case class AlterTableSetLocation(
 
 }
 
-case class AlterTableTouch(
-    tableName: TableIdentifier,
-    partitionSpec: Option[TablePartitionSpec])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableCompact(
-    tableName: TableIdentifier,
-    partitionSpec: Option[TablePartitionSpec],
-    compactType: String)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableMerge(
-    tableName: TableIdentifier,
-    partitionSpec: Option[TablePartitionSpec])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
 case class AlterTableChangeCol(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec],

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index ac69518..1c8dd68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
@@ -24,9 +25,17 @@ import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.types._
 
+// TODO: merge this with DDLSuite (SPARK-14441)
 class DDLCommandSuite extends PlanTest {
   private val parser = SparkSqlParser
 
+  private def assertUnsupported(sql: String): Unit = {
+    val e = intercept[AnalysisException] {
+      parser.parsePlan(sql)
+    }
+    assert(e.getMessage.toLowerCase.contains("operation not allowed"))
+  }
+
   test("create database") {
     val sql =
       """
@@ -326,11 +335,11 @@ class DDLCommandSuite extends PlanTest {
       Seq(
         (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
         (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
-      ifNotExists = true)(sql1)
+      ifNotExists = true)
     val expected2 = AlterTableAddPartition(
       TableIdentifier("table_name", None),
       Seq((Map("dt" -> "2008-08-08"), Some("loc"))),
-      ifNotExists = false)(sql2)
+      ifNotExists = false)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
@@ -369,22 +378,16 @@ class DDLCommandSuite extends PlanTest {
     val expected = AlterTableRenamePartition(
       TableIdentifier("table_name", None),
       Map("dt" -> "2008-08-08", "country" -> "us"),
-      Map("dt" -> "2008-09-09", "country" -> "uk"))(sql)
+      Map("dt" -> "2008-09-09", "country" -> "uk"))
     comparePlans(parsed, expected)
   }
 
-  test("alter table: exchange partition") {
-    val sql =
+  test("alter table: exchange partition (not supported)") {
+    assertUnsupported(
       """
        |ALTER TABLE table_name_1 EXCHANGE PARTITION
        |(dt='2008-08-08', country='us') WITH TABLE table_name_2
-      """.stripMargin
-    val parsed = parser.parsePlan(sql)
-    val expected = AlterTableExchangePartition(
-      TableIdentifier("table_name_1", None),
-      TableIdentifier("table_name_2", None),
-      Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
-    comparePlans(parsed, expected)
+      """.stripMargin)
   }
 
   // ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]
@@ -405,7 +408,10 @@ class DDLCommandSuite extends PlanTest {
     val sql2_view = sql2_table.replace("TABLE", "VIEW").replace("PURGE", "")
 
     val parsed1_table = parser.parsePlan(sql1_table)
-    val parsed2_table = parser.parsePlan(sql2_table)
+    val e = intercept[ParseException] {
+      parser.parsePlan(sql2_table)
+    }
+    assert(e.getMessage.contains("Operation not allowed"))
 
     intercept[ParseException] {
       parser.parsePlan(sql1_view)
@@ -420,36 +426,17 @@ class DDLCommandSuite extends PlanTest {
       Seq(
         Map("dt" -> "2008-08-08", "country" -> "us"),
         Map("dt" -> "2009-09-09", "country" -> "uk")),
-      ifExists = true,
-      purge = false)(sql1_table)
-    val expected2_table = AlterTableDropPartition(
-      tableIdent,
-      Seq(
-        Map("dt" -> "2008-08-08", "country" -> "us"),
-        Map("dt" -> "2009-09-09", "country" -> "uk")),
-      ifExists = false,
-      purge = true)(sql2_table)
+      ifExists = true)
 
     comparePlans(parsed1_table, expected1_table)
-    comparePlans(parsed2_table, expected2_table)
   }
 
-  test("alter table: archive partition") {
-    val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')"
-    val parsed = parser.parsePlan(sql)
-    val expected = AlterTableArchivePartition(
-      TableIdentifier("table_name", None),
-      Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
-    comparePlans(parsed, expected)
+  test("alter table: archive partition (not supported)") {
+    assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')")
   }
 
-  test("alter table: unarchive partition") {
-    val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')"
-    val parsed = parser.parsePlan(sql)
-    val expected = AlterTableUnarchivePartition(
-      TableIdentifier("table_name", None),
-      Map("dt" -> "2008-08-08", "country" -> "us"))(sql)
-    comparePlans(parsed, expected)
+  test("alter table: unarchive partition (not supported)") {
+    assertUnsupported("ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')")
   }
 
   test("alter table: set file format") {
@@ -505,55 +492,24 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed2, expected2)
   }
 
-  test("alter table: touch") {
-    val sql1 = "ALTER TABLE table_name TOUCH"
-    val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableTouch(
-      tableIdent,
-      None)(sql1)
-    val expected2 = AlterTableTouch(
-      tableIdent,
-      Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
+  test("alter table: touch (not supported)") {
+    assertUnsupported("ALTER TABLE table_name TOUCH")
+    assertUnsupported("ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')")
   }
 
-  test("alter table: compact") {
-    val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'"
-    val sql2 =
+  test("alter table: compact (not supported)") {
+    assertUnsupported("ALTER TABLE table_name COMPACT 'compaction_type'")
+    assertUnsupported(
       """
-       |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
-       |COMPACT 'MAJOR'
-      """.stripMargin
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableCompact(
-      tableIdent,
-      None,
-      "compaction_type")(sql1)
-    val expected2 = AlterTableCompact(
-      tableIdent,
-      Some(Map("dt" -> "2008-08-08", "country" -> "us")),
-      "MAJOR")(sql2)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
+        |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us')
+        |COMPACT 'MAJOR'
+      """.stripMargin)
   }
 
-  test("alter table: concatenate") {
-    val sql1 = "ALTER TABLE table_name CONCATENATE"
-    val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableMerge(tableIdent, None)(sql1)
-    val expected2 = AlterTableMerge(
-      tableIdent, Some(Map("dt" -> "2008-08-08", "country" -> "us")))(sql2)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
+  test("alter table: concatenate (not supported)") {
+    assertUnsupported("ALTER TABLE table_name CONCATENATE")
+    assertUnsupported(
+      "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE")
   }
 
   test("alter table: change column name/type/position/comment") {

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c6479bf..40a8b0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -58,6 +58,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(e.getMessage.toLowerCase.contains("operation not allowed"))
   }
 
+  private def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = {
+    if (expectException) intercept[AnalysisException] { body } else body
+  }
+
   private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
     catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
   }
@@ -320,6 +324,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
   }
 
+  test("alter table: add partition") {
+    testAddPartitions(isDatasourceTable = false)
+  }
+
+  test("alter table: add partition (datasource table)") {
+    testAddPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: add partition is not supported for views") {
+    assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
+  }
+
+  test("alter table: drop partition") {
+    testDropPartitions(isDatasourceTable = false)
+  }
+
+  test("alter table: drop partition (datasource table)") {
+    testDropPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: drop partition is not supported for views") {
+    assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
+  }
+
+  test("alter table: rename partition") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1")
+    val part2 = Map("b" -> "2")
+    val part3 = Map("c" -> "3")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, part1, tableIdent)
+    createTablePartition(catalog, part2, tableIdent)
+    createTablePartition(catalog, part3, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3))
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')")
+    sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "100"), Map("b" -> "200"), part3))
+    // rename without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "10"), Map("b" -> "200"), part3))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
+    }
+    // partition to rename does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')")
+    }
+  }
+
   // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
 
   test("show tables") {
@@ -487,10 +547,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         assert(storageFormat.locationUri === Some(expected))
       }
     }
-    // Optionally expect AnalysisException
-    def maybeWrapException[T](expectException: Boolean)(body: => T): Unit = {
-      if (expectException) intercept[AnalysisException] { body } else body
-    }
     // set table location
     sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
     verifyLocation("/path/to/your/lovely/heart")
@@ -564,4 +620,102 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1")
+    val part2 = Map("b" -> "2")
+    val part3 = Map("c" -> "3")
+    val part4 = Map("d" -> "4")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, part1, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+        "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+      assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+      assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris"))
+      assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
+    }
+    // add partitions without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2, part3, part4))
+    }
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')")
+    }
+    // partition to add already exists
+    intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 ADD PARTITION (d='4')")
+    }
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2, part3, part4))
+    }
+  }
+
+  private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1")
+    val part2 = Map("b" -> "2")
+    val part3 = Map("c" -> "3")
+    val part4 = Map("d" -> "4")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, part1, tableIdent)
+    createTablePartition(catalog, part2, tableIdent)
+    createTablePartition(catalog, part3, tableIdent)
+    createTablePartition(catalog, part4, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
+    }
+    // drop partitions without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+    }
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')")
+    }
+    // partition to drop does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 DROP PARTITION (x='300')")
+    }
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')")
+    }
+    if (!isDatasourceTable) {
+      assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 9e3cb18..f0eeda0 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -376,7 +376,13 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
 
     // Create partitioned view is not supported
     "create_like_view",
-    "describe_formatted_view_partitioned"
+    "describe_formatted_view_partitioned",
+
+    // This uses CONCATENATE, which we don't support
+    "alter_merge_2",
+
+    // TOUCH is not supported
+    "touch"
   )
 
   /**
@@ -392,7 +398,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "alter2",
     "alter3",
     "alter5",
-    "alter_merge_2",
     "alter_partition_format_loc",
     "alter_partition_with_whitelist",
     "alter_rename_partition",
@@ -897,7 +902,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "timestamp_comparison",
     "timestamp_lazy",
     "timestamp_null",
-    "touch",
     "transform_ppr1",
     "transform_ppr2",
     "truncate_table",

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index a49ce33..482f474 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -219,26 +219,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    // Note: Unfortunately Hive does not currently support `ignoreIfNotExists` so we
-    // need to implement it here ourselves. This is currently somewhat expensive because
-    // we make multiple synchronous calls to Hive for each partition we want to drop.
-    val partsToDrop =
-      if (ignoreIfNotExists) {
-        parts.filter { spec =>
-          try {
-            getPartition(db, table, spec)
-            true
-          } catch {
-            // Filter out the partitions that do not actually exist
-            case _: AnalysisException => false
-          }
-        }
-      } else {
-        parts
-      }
-    if (partsToDrop.nonEmpty) {
-      client.dropPartitions(db, table, partsToDrop)
-    }
+    client.dropPartitions(db, table, parts, ignoreIfNotExists)
   }
 
   override def renamePartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 94794b1..6f7e7bf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -120,16 +120,13 @@ private[hive] trait HiveClient {
       ignoreIfExists: Boolean): Unit
 
   /**
-   * Drop one or many partitions in the given table.
-   *
-   * Note: Unfortunately, Hive does not currently provide a way to ignore this call if the
-   * partitions do not already exist. The seemingly relevant flag `ifExists` in
-   * [[org.apache.hadoop.hive.metastore.PartitionDropOptions]] is not read anywhere.
+   * Drop one or many partitions in the given table, assuming they exist.
    */
   def dropPartitions(
       db: String,
       table: String,
-      specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit
+      specs: Seq[ExternalCatalog.TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit
 
   /**
    * Rename one or many existing table partitions, assuming they exist.

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a037671..39e26ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.cli.CliSessionState
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
@@ -367,9 +367,25 @@ private[hive] class HiveClientImpl(
   override def dropPartitions(
       db: String,
       table: String,
-      specs: Seq[ExternalCatalog.TablePartitionSpec]): Unit = withHiveState {
+      specs: Seq[ExternalCatalog.TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = withHiveState {
     // TODO: figure out how to drop multiple partitions in one call
-    specs.foreach { s => client.dropPartition(db, table, s.values.toList.asJava, true) }
+    val hiveTable = client.getTable(db, table, true /* throw exception */)
+    specs.foreach { s =>
+      // The provided spec here can be a partial spec, i.e. it will match all partitions
+      // whose specs are supersets of this partial spec. E.g. If a table has partitions
+      // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+      val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
+      if (matchingParts.isEmpty && !ignoreIfNotExists) {
+        throw new AnalysisException(
+          s"partition to drop '$s' does not exist in table '$table' database '$db'")
+      }
+      matchingParts.foreach { hivePartition =>
+        val dropOptions = new PartitionDropOptions
+        dropOptions.ifExists = ignoreIfNotExists
+        client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+      }
+    }
   }
 
   override def renamePartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/83fb9640/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index a144da4..e8086ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -41,6 +41,13 @@ class HiveDDLCommandSuite extends PlanTest {
     }.head
   }
 
+  private def assertUnsupported(sql: String): Unit = {
+    val e = intercept[ParseException] {
+      parser.parsePlan(sql)
+    }
+    assert(e.getMessage.toLowerCase.contains("unsupported"))
+  }
+
   test("Test CTAS #1") {
     val s1 =
       """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view
@@ -367,4 +374,9 @@ class HiveDDLCommandSuite extends PlanTest {
       parser.parsePlan(v1).isInstanceOf[HiveNativeCommand]
     }
   }
+
+  test("MSCK repair table (not supported)") {
+    assertUnsupported("MSCK REPAIR TABLE tab1")
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org