You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/05 23:54:10 UTC

spark git commit: [SPARK-14129][SPARK-14128][SQL] Alter table DDL commands

Repository: spark
Updated Branches:
  refs/heads/master c59abad05 -> 45d8cdee3


[SPARK-14129][SPARK-14128][SQL] Alter table DDL commands

## What changes were proposed in this pull request?

In Spark 2.0, we want to handle the most common `ALTER TABLE` commands ourselves instead of passing the entire query text to Hive. This is done using the new `SessionCatalog` API introduced recently.

The commands supported in this patch include:
```
ALTER TABLE ... RENAME TO ...
ALTER TABLE ... SET TBLPROPERTIES ...
ALTER TABLE ... UNSET TBLPROPERTIES ...
ALTER TABLE ... SET LOCATION ...
ALTER TABLE ... SET SERDE ...
```
The commands we explicitly do not support are:
```
ALTER TABLE ... CLUSTERED BY ...
ALTER TABLE ... SKEWED BY ...
ALTER TABLE ... NOT CLUSTERED
ALTER TABLE ... NOT SORTED
ALTER TABLE ... NOT SKEWED
ALTER TABLE ... NOT STORED AS DIRECTORIES
```
For these we throw exceptions complaining that they are not supported.

## How was this patch tested?

`DDLSuite`

Author: Andrew Or <an...@databricks.com>

Closes #12121 from andrewor14/alter-table-ddl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45d8cdee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45d8cdee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45d8cdee

Branch: refs/heads/master
Commit: 45d8cdee3945bf94d0f1bd93a12e4cb0d416468e
Parents: c59abad
Author: Andrew Or <an...@databricks.com>
Authored: Tue Apr 5 14:54:07 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Apr 5 14:54:07 2016 -0700

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |  12 +
 .../sql/catalyst/catalog/SessionCatalog.scala   |  42 +++
 .../sql/catalyst/util/StringKeyHashMap.scala    |   2 +
 .../spark/sql/execution/SparkSqlParser.scala    | 121 ++-----
 .../spark/sql/execution/command/ddl.scala       | 216 +++++++++---
 .../sql/execution/command/DDLCommandSuite.scala | 127 +------
 .../spark/sql/execution/command/DDLSuite.scala  | 330 +++++++++++++++++--
 .../hive/execution/HiveCompatibilitySuite.scala |  10 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     |   2 +-
 9 files changed, 562 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 1ebdf49..f239b33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -54,6 +54,10 @@ trait FunctionRegistry {
 
   /** Checks if a function with a given name exists. */
   def functionExists(name: String): Boolean = lookupFunction(name).isDefined
+
+  /** Clear all registered functions. */
+  def clear(): Unit
+
 }
 
 class SimpleFunctionRegistry extends FunctionRegistry {
@@ -93,6 +97,10 @@ class SimpleFunctionRegistry extends FunctionRegistry {
     functionBuilders.remove(name).isDefined
   }
 
+  override def clear(): Unit = {
+    functionBuilders.clear()
+  }
+
   def copy(): SimpleFunctionRegistry = synchronized {
     val registry = new SimpleFunctionRegistry
     functionBuilders.iterator.foreach { case (name, (info, builder)) =>
@@ -132,6 +140,10 @@ object EmptyFunctionRegistry extends FunctionRegistry {
     throw new UnsupportedOperationException
   }
 
+  override def clear(): Unit = {
+    throw new UnsupportedOperationException
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c08ffbb..62a3b1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -304,12 +304,19 @@ class SessionCatalog(
     dbTables ++ _tempTables
   }
 
+  // TODO: It's strange that we have both refresh and invalidate here.
+
   /**
    * Refresh the cache entry for a metastore table, if any.
    */
   def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
 
   /**
+   * Invalidate the cache entry for a metastore table, if any.
+   */
+  def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
+
+  /**
    * Drop all existing temporary tables.
    * For testing only.
    */
@@ -596,6 +603,11 @@ class SessionCatalog(
   }
 
   /**
+   * List all functions in the specified database, including temporary functions.
+   */
+  def listFunctions(db: String): Seq[FunctionIdentifier] = listFunctions(db, "*")
+
+  /**
    * List all matching functions in the specified database, including temporary functions.
    */
   def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
@@ -609,4 +621,34 @@ class SessionCatalog(
     // So, the returned list may have two entries for the same function.
     dbFunctions ++ loadedFunctions
   }
+
+
+  // -----------------
+  // | Other methods |
+  // -----------------
+
+  /**
+   * Drop all existing databases (except "default") along with all associated tables,
+   * partitions and functions, and set the current database to "default".
+   *
+   * This is mainly used for tests.
+   */
+  private[sql] def reset(): Unit = {
+    val default = "default"
+    listDatabases().filter(_ != default).foreach { db =>
+      dropDatabase(db, ignoreIfNotExists = false, cascade = true)
+    }
+    tempTables.clear()
+    functionRegistry.clear()
+    // restore built-in functions
+    FunctionRegistry.builtin.listFunction().foreach { f =>
+      val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
+      val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
+      require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
+      require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
+      functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
+    }
+    setCurrentDatabase(default)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
index 191d5e6..d5d151a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
@@ -41,4 +41,6 @@ class StringKeyHashMap[T](normalizer: (String) => String) {
   def remove(key: String): Option[T] = base.remove(normalizer(key))
 
   def iterator: Iterator[(String, T)] = base.toIterator
+
+  def clear(): Unit = base.clear()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 382cc61..d3086fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -378,8 +378,7 @@ class SparkSqlAstBuilder extends AstBuilder {
   override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
     AlterTableRename(
       visitTableIdentifier(ctx.from),
-      visitTableIdentifier(ctx.to))(
-      command(ctx))
+      visitTableIdentifier(ctx.to))
   }
 
   /**
@@ -395,8 +394,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
     AlterTableSetProperties(
       visitTableIdentifier(ctx.tableIdentifier),
-      visitTablePropertyList(ctx.tablePropertyList))(
-      command(ctx))
+      visitTablePropertyList(ctx.tablePropertyList))
   }
 
   /**
@@ -404,17 +402,16 @@ class SparkSqlAstBuilder extends AstBuilder {
    *
    * For example:
    * {{{
-   *   ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
-   *   ALTER VIEW view UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
+   *   ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
+   *   ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
    * }}}
    */
   override def visitUnsetTableProperties(
       ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
     AlterTableUnsetProperties(
       visitTableIdentifier(ctx.tableIdentifier),
-      visitTablePropertyList(ctx.tablePropertyList),
-      ctx.EXISTS != null)(
-      command(ctx))
+      visitTablePropertyList(ctx.tablePropertyList).keys.toSeq,
+      ctx.EXISTS != null)
   }
 
   /**
@@ -432,116 +429,41 @@ class SparkSqlAstBuilder extends AstBuilder {
       Option(ctx.STRING).map(string),
       Option(ctx.tablePropertyList).map(visitTablePropertyList),
       // TODO a partition spec is allowed to have optional values. This is currently violated.
-      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
-      command(ctx))
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
-  /**
-   * Create an [[AlterTableStorageProperties]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
-   * }}}
-   */
+  // TODO: don't even bother parsing alter table commands related to bucketing and skewing
+
   override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableStorageProperties(
-      visitTableIdentifier(ctx.tableIdentifier),
-      visitBucketSpec(ctx.bucketSpec))(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... CLUSTERED BY ... INTO N BUCKETS")
   }
 
-  /**
-   * Create an [[AlterTableNotClustered]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT CLUSTERED;
-   * }}}
-   */
   override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT CLUSTERED")
   }
 
-  /**
-   * Create an [[AlterTableNotSorted]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT SORTED;
-   * }}}
-   */
   override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SORTED")
   }
 
-  /**
-   * Create an [[AlterTableSkewed]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table SKEWED BY (col1, col2)
-   *   ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
-   *   [STORED AS DIRECTORIES];
-   * }}}
-   */
   override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
-    val table = visitTableIdentifier(ctx.tableIdentifier)
-    val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
-    AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... SKEWED BY ...")
   }
 
-  /**
-   * Create an [[AlterTableNotSorted]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT SKEWED;
-   * }}}
-   */
   override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SKEWED")
   }
 
-  /**
-   * Create an [[AlterTableNotStoredAsDirs]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT STORED AS DIRECTORIES
-   * }}}
-   */
   override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... NOT STORED AS DIRECTORIES")
   }
 
-  /**
-   * Create an [[AlterTableSkewedLocation]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
-   * }}}
-   */
   override def visitSetTableSkewLocations(
       ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
-    val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
-      slCtx =>
-        val location = string(slCtx.STRING)
-        if (slCtx.constant != null) {
-          Seq(visitStringConstant(slCtx.constant) -> location)
-        } else {
-          // TODO this is similar to what was in the original implementation. However this does not
-          // make to much sense to me since we should be storing a tuple of values (not column
-          // names) for which we want a dedicated storage location.
-          visitConstantList(slCtx.constantList).map(_ -> location)
-        }
-    }.toMap
-
-    AlterTableSkewedLocation(
-      visitTableIdentifier(ctx.tableIdentifier),
-      skewedMap)(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... SET SKEWED LOCATION ...")
   }
 
   /**
@@ -703,8 +625,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     AlterTableSetLocation(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
-      visitLocationSpec(ctx.locationSpec))(
-      command(ctx))
+      visitLocationSpec(ctx.locationSpec))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6896881..0d38c41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -18,12 +18,11 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
 
@@ -175,67 +174,133 @@ case class DescribeDatabase(
   }
 }
 
-/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
+/**
+ * A command that renames a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *    ALTER TABLE table1 RENAME TO table2;
+ *    ALTER VIEW view1 RENAME TO view2;
+ * }}}
+ */
 case class AlterTableRename(
     oldName: TableIdentifier,
-    newName: TableIdentifier)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    newName: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    catalog.invalidateTable(oldName)
+    catalog.renameTable(oldName, newName)
+    Seq.empty[Row]
+  }
 
-/** Set Properties in ALTER TABLE/VIEW: add metadata to a table/view. */
+}
+
+/**
+ * A command that sets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ *   ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ * }}}
+ */
 case class AlterTableSetProperties(
     tableName: TableIdentifier,
-    properties: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    properties: Map[String, String])
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    val newProperties = table.properties ++ properties
+    if (DDLUtils.isDatasourceTable(newProperties)) {
+      throw new AnalysisException(
+        "alter table properties is not supported for tables defined using the datasource API")
+    }
+    val newTable = table.copy(properties = newProperties)
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
+
+}
 
-/** Unset Properties in ALTER TABLE/VIEW: remove metadata from a table/view. */
+/**
+ * A command that unsets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ *   ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ * }}}
+ */
 case class AlterTableUnsetProperties(
     tableName: TableIdentifier,
-    properties: Map[String, String],
-    ifExists: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    propKeys: Seq[String],
+    ifExists: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table properties is not supported for datasource tables")
+    }
+    if (!ifExists) {
+      propKeys.foreach { k =>
+        if (!table.properties.contains(k)) {
+          throw new AnalysisException(
+            s"attempted to unset non-existent property '$k' in table '$tableName'")
+        }
+      }
+    }
+    val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
+    val newTable = table.copy(properties = newProperties)
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
+
+}
 
+/**
+ * A command that sets the serde class and/or serde properties of a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+ *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+ * }}}
+ */
 case class AlterTableSerDeProperties(
     tableName: TableIdentifier,
     serdeClassName: Option[String],
     serdeProperties: Option[Map[String, String]],
-    partition: Option[Map[String, String]])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableStorageProperties(
-    tableName: TableIdentifier,
-    buckets: BucketSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    partition: Option[Map[String, String]])
+  extends RunnableCommand {
 
-case class AlterTableNotClustered(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+  // should never happen if we parsed things correctly
+  require(serdeClassName.isDefined || serdeProperties.isDefined,
+    "alter table attempted to set neither serde class name nor serde properties")
 
-case class AlterTableNotSorted(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    // Do not support setting serde for datasource tables
+    if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table serde is not supported for datasource tables")
+    }
+    val newTable = table.withNewStorage(
+      serde = serdeClassName.orElse(table.storage.serde),
+      serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
 
-case class AlterTableSkewed(
-    tableName: TableIdentifier,
-    // e.g. (dt, country)
-    skewedCols: Seq[String],
-    // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
-    skewedValues: Seq[Seq[String]],
-    storedAsDirs: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging {
-
-  require(skewedValues.forall(_.size == skewedCols.size),
-    "number of columns in skewed values do not match number of skewed columns provided")
 }
 
-case class AlterTableNotSkewed(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableNotStoredAsDirs(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableSkewedLocation(
-    tableName: TableIdentifier,
-    skewedMap: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
 /**
  * Add Partition in ALTER TABLE/VIEW: add the table/view partitions.
  * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
@@ -292,11 +357,53 @@ case class AlterTableSetFileFormat(
     genericFormat: Option[String])(sql: String)
   extends NativeDDLCommand(sql) with Logging
 
+/**
+ * A command that sets the location of a table or a partition.
+ *
+ * For normal tables, this just sets the location URI in the table/partition's storage format.
+ * For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *    ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
+ * }}}
+ */
 case class AlterTableSetLocation(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec],
-    location: String)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    location: String)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    partitionSpec match {
+      case Some(spec) =>
+        // Partition spec is specified, so we set the location only for this partition
+        val part = catalog.getPartition(tableName, spec)
+        val newPart =
+          if (DDLUtils.isDatasourceTable(table)) {
+            part.copy(storage = part.storage.copy(
+              serdeProperties = part.storage.serdeProperties ++ Map("path" -> location)))
+          } else {
+            part.copy(storage = part.storage.copy(locationUri = Some(location)))
+          }
+        catalog.alterPartitions(tableName, Seq(newPart))
+      case None =>
+        // No partition spec is specified, so we set the location for the table itself
+        val newTable =
+          if (DDLUtils.isDatasourceTable(table)) {
+            table.withNewStorage(
+              serdeProperties = table.storage.serdeProperties ++ Map("path" -> location))
+          } else {
+            table.withNewStorage(locationUri = Some(location))
+          }
+        catalog.alterTable(newTable)
+    }
+    Seq.empty[Row]
+  }
+
+}
 
 case class AlterTableTouch(
     tableName: TableIdentifier,
@@ -341,3 +448,16 @@ case class AlterTableReplaceCol(
     restrict: Boolean,
     cascade: Boolean)(sql: String)
   extends NativeDDLCommand(sql) with Logging
+
+
+private object DDLUtils {
+
+  def isDatasourceTable(props: Map[String, String]): Boolean = {
+    props.contains("spark.sql.sources.provider")
+  }
+
+  def isDatasourceTable(table: CatalogTable): Boolean = {
+    isDatasourceTable(table.properties)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index c42e8e7..618c9a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -205,10 +205,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed_view = parser.parsePlan(sql_view)
     val expected_table = AlterTableRename(
       TableIdentifier("table_name", None),
-      TableIdentifier("new_table_name", None))(sql_table)
+      TableIdentifier("new_table_name", None))
     val expected_view = AlterTableRename(
       TableIdentifier("table_name", None),
-      TableIdentifier("new_table_name", None))(sql_view)
+      TableIdentifier("new_table_name", None))
     comparePlans(parsed_table, expected_table)
     comparePlans(parsed_view, expected_view)
   }
@@ -235,14 +235,14 @@ class DDLCommandSuite extends PlanTest {
 
     val tableIdent = TableIdentifier("table_name", None)
     val expected1_table = AlterTableSetProperties(
-      tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1_table)
+      tableIdent, Map("test" -> "test", "comment" -> "new_comment"))
     val expected2_table = AlterTableUnsetProperties(
-      tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2_table)
+      tableIdent, Seq("comment", "test"), ifExists = false)
     val expected3_table = AlterTableUnsetProperties(
-      tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3_table)
-    val expected1_view = expected1_table.copy()(sql = sql1_view)
-    val expected2_view = expected2_table.copy()(sql = sql2_view)
-    val expected3_view = expected3_table.copy()(sql = sql3_view)
+      tableIdent, Seq("comment", "test"), ifExists = true)
+    val expected1_view = expected1_table
+    val expected2_view = expected2_table
+    val expected3_view = expected3_table
 
     comparePlans(parsed1_table, expected1_table)
     comparePlans(parsed2_table, expected2_table)
@@ -282,97 +282,24 @@ class DDLCommandSuite extends PlanTest {
     val parsed5 = parser.parsePlan(sql5)
     val tableIdent = TableIdentifier("table_name", None)
     val expected1 = AlterTableSerDeProperties(
-      tableIdent, Some("org.apache.class"), None, None)(sql1)
+      tableIdent, Some("org.apache.class"), None, None)
     val expected2 = AlterTableSerDeProperties(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      None)(sql2)
+      None)
     val expected3 = AlterTableSerDeProperties(
-      tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
+      tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)
     val expected4 = AlterTableSerDeProperties(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
+      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
     val expected5 = AlterTableSerDeProperties(
       tableIdent,
       None,
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
-    comparePlans(parsed4, expected4)
-    comparePlans(parsed5, expected5)
-  }
-
-  test("alter table: storage properties") {
-    val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
-    val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
-      "(dt, country DESC) INTO 10 BUCKETS"
-    val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
-    val sql4 = "ALTER TABLE table_name NOT SORTED"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val parsed3 = parser.parsePlan(sql3)
-    val parsed4 = parser.parsePlan(sql4)
-    val tableIdent = TableIdentifier("table_name", None)
-    val cols = List("dt", "country")
-    // TODO: also test the sort directions once we keep track of that
-    val expected1 = AlterTableStorageProperties(
-      tableIdent, BucketSpec(10, cols, Nil))(sql1)
-    val expected2 = AlterTableStorageProperties(
-      tableIdent, BucketSpec(10, cols, cols))(sql2)
-    val expected3 = AlterTableNotClustered(tableIdent)(sql3)
-    val expected4 = AlterTableNotSorted(tableIdent)(sql4)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
-    comparePlans(parsed4, expected4)
-  }
-
-  test("alter table: skewed") {
-    val sql1 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
-      """.stripMargin
-    val sql2 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |('2008-08-08', 'us') STORED AS DIRECTORIES
-      """.stripMargin
-    val sql3 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
-      """.stripMargin
-    val sql4 = "ALTER TABLE table_name NOT SKEWED"
-    val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val parsed3 = parser.parsePlan(sql3)
-    val parsed4 = parser.parsePlan(sql4)
-    val parsed5 = parser.parsePlan(sql5)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
-      storedAsDirs = true)(sql1)
-    val expected2 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us")),
-      storedAsDirs = true)(sql2)
-    val expected3 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
-      storedAsDirs = false)(sql3)
-    val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
-    val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
+      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
     comparePlans(parsed3, expected3)
@@ -380,30 +307,6 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed5, expected5)
   }
 
-  test("alter table: skewed location") {
-    val sql1 =
-      """
-       |ALTER TABLE table_name SET SKEWED LOCATION
-       |('123'='location1', 'test'='location2')
-      """.stripMargin
-    val sql2 =
-      """
-       |ALTER TABLE table_name SET SKEWED LOCATION
-       |(('2008-08-08', 'us')='location1', 'test'='location2')
-      """.stripMargin
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSkewedLocation(
-      tableIdent,
-      Map("123" -> "location1", "test" -> "location2"))(sql1)
-    val expected2 = AlterTableSkewedLocation(
-      tableIdent,
-      Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-  }
-
   // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
   // [LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
   test("alter table: add partition") {
@@ -615,11 +518,11 @@ class DDLCommandSuite extends PlanTest {
     val expected1 = AlterTableSetLocation(
       tableIdent,
       None,
-      "new location")(sql1)
+      "new location")
     val expected2 = AlterTableSetLocation(
       tableIdent,
       Some(Map("dt" -> "2008-08-08", "country" -> "us")),
-      "new location")(sql2)
+      "new location")
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 885a04a..d8e2c94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -19,34 +19,63 @@ package org.apache.spark.sql.execution.command
 
 import java.io.File
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.test.SharedSQLContext
 
-class DDLSuite extends QueryTest with SharedSQLContext {
-
+class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   private val escapedIdentifier = "`(.+)`".r
 
+  override def afterEach(): Unit = {
+    try {
+      // drop all databases, tables and functions after each test
+      sqlContext.sessionState.catalog.reset()
+    } finally {
+      super.afterEach()
+    }
+  }
+
   /**
    * Strip backticks, if any, from the string.
    */
-  def cleanIdentifier(ident: String): String = {
+  private def cleanIdentifier(ident: String): String = {
     ident match {
       case escapedIdentifier(i) => i
       case plainIdent => plainIdent
     }
   }
 
-  /**
-   * Drops database `databaseName` after calling `f`.
-   */
-  private def withDatabase(dbNames: String*)(f: => Unit): Unit = {
-    try f finally {
-      dbNames.foreach { name =>
-        sqlContext.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
-      }
-      sqlContext.sessionState.catalog.setCurrentDatabase("default")
+  private def assertUnsupported(query: String): Unit = {
+    val e = intercept[AnalysisException] {
+      sql(query)
     }
+    assert(e.getMessage.toLowerCase.contains("operation not allowed"))
+  }
+
+  private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
+    catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
+  }
+
+  private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
+    catalog.createTable(CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = CatalogStorageFormat(None, None, None, None, Map()),
+      schema = Seq()), ignoreIfExists = false)
+  }
+
+  private def createTablePartition(
+      catalog: SessionCatalog,
+      spec: TablePartitionSpec,
+      tableName: TableIdentifier): Unit = {
+    val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map()))
+    catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
   test("Create/Drop Database") {
@@ -55,7 +84,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      withDatabase(dbName) {
+      try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
 
         sql(s"CREATE DATABASE $dbName")
@@ -67,6 +96,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
           Map.empty))
         sql(s"DROP DATABASE $dbName CASCADE")
         assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -76,8 +107,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-      withDatabase(dbName) {
+      try {
+        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         sql(s"CREATE DATABASE $dbName")
         val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
         assert(db1 == CatalogDatabase(
@@ -90,6 +121,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
           sql(s"CREATE DATABASE $dbName")
         }.getMessage
         assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -99,7 +132,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      withDatabase(dbName) {
+      try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         val location =
           System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
@@ -129,6 +162,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
             Row("Description", "") ::
             Row("Location", location) ::
             Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -159,6 +194,131 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  // TODO: test drop database in restrict mode
+
+  test("alter table: rename") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
+    val tableIdent2 = TableIdentifier("tab2", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createDatabase(catalog, "dby")
+    createTable(catalog, tableIdent1)
+    assert(catalog.listTables("dbx") == Seq(tableIdent1))
+    sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
+    assert(catalog.listTables("dbx") == Seq(tableIdent2))
+    catalog.setCurrentDatabase("dbx")
+    // rename without explicitly specifying database
+    sql("ALTER TABLE tab2 RENAME TO tab1")
+    assert(catalog.listTables("dbx") == Seq(tableIdent1))
+    // table to rename does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
+    }
+    // destination database is different
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")
+    }
+  }
+
+  test("alter table: set location") {
+    testSetLocation(isDatasourceTable = false)
+  }
+
+  test("alter table: set location (datasource table)") {
+    testSetLocation(isDatasourceTable = true)
+  }
+
+  test("alter table: set properties") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    // set table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
+    assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel"))
+    // set table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
+    assert(catalog.getTable(tableIdent).properties ==
+      Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
+    }
+    // throw exception for datasource tables
+    convertToDatasourceTable(catalog, tableIdent)
+    val e = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')")
+    }
+    assert(e.getMessage.contains("datasource"))
+  }
+
+  test("alter table: unset properties") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    // unset table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
+    sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
+    assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
+    // unset table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
+    assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
+    }
+    // property to unset does not exist
+    val e = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
+    }
+    assert(e.getMessage.contains("xyz"))
+    // property to unset does not exist, but "IF EXISTS" is specified
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
+    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    // throw exception for datasource tables
+    convertToDatasourceTable(catalog, tableIdent)
+    val e1 = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')")
+    }
+    assert(e1.getMessage.contains("datasource"))
+  }
+
+  test("alter table: set serde") {
+    testSetSerde(isDatasourceTable = false)
+  }
+
+  test("alter table: set serde (datasource table)") {
+    testSetSerde(isDatasourceTable = true)
+  }
+
+  test("alter table: bucketing is not supported") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (blood, lemon, grape) INTO 11 BUCKETS")
+    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (fuji) SORTED BY (grape) INTO 5 BUCKETS")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT CLUSTERED")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT SORTED")
+  }
+
+  test("alter table: skew is not supported") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
+      "(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn'))")
+    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
+      "(('2008-08-08', 'us'), ('2009-09-09', 'uk')) STORED AS DIRECTORIES")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT SKEWED")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
+  }
+
   // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
 
   test("show tables") {
@@ -206,29 +366,129 @@ class DDLSuite extends QueryTest with SharedSQLContext {
   }
 
   test("show databases") {
-    withDatabase("showdb1A", "showdb2B") {
-      sql("CREATE DATABASE showdb1A")
-      sql("CREATE DATABASE showdb2B")
+    sql("CREATE DATABASE showdb1A")
+    sql("CREATE DATABASE showdb2B")
 
-      assert(
-        sql("SHOW DATABASES").count() >= 2)
+    assert(
+      sql("SHOW DATABASES").count() >= 2)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE '*db1A'"),
-        Row("showdb1A") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE '*db1A'"),
+      Row("showdb1A") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE 'showdb1A'"),
-        Row("showdb1A") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE 'showdb1A'"),
+      Row("showdb1A") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
-        Row("showdb1A") ::
-          Row("showdb2B") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
+      Row("showdb1A") ::
+        Row("showdb2B") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE 'non-existentdb'"),
-        Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE 'non-existentdb'"),
+      Nil)
+  }
+
+  private def convertToDatasourceTable(
+      catalog: SessionCatalog,
+      tableIdent: TableIdentifier): Unit = {
+    catalog.alterTable(catalog.getTable(tableIdent).copy(
+      properties = Map("spark.sql.sources.provider" -> "csv")))
+  }
+
+  private def testSetLocation(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val partSpec = Map("a" -> "1")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, partSpec, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
+    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
+    // Verify that the location is set to the expected string
+    def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
+      val storageFormat = spec
+        .map { s => catalog.getPartition(tableIdent, s).storage }
+        .getOrElse { catalog.getTable(tableIdent).storage }
+      if (isDatasourceTable) {
+        assert(storageFormat.serdeProperties.get("path") === Some(expected))
+      } else {
+        assert(storageFormat.locationUri === Some(expected))
+      }
+    }
+    // set table location
+    sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
+    verifyLocation("/path/to/your/lovely/heart")
+    // set table partition location
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
+    verifyLocation("/path/to/part/ways", Some(partSpec))
+    // set table location without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
+    verifyLocation("/swanky/steak/place")
+    // set table partition location without explicitly specifying database
+    sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
+    verifyLocation("vienna", Some(partSpec))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
+    }
+    // partition to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.tab1 PARTITION (b='2') SET LOCATION '/mister/spark'")
     }
   }
+
+  private def testSetSerde(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
+    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    // set table serde and/or properties (should fail on datasource tables)
+    if (isDatasourceTable) {
+      val e1 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 SET SERDE 'whatever'")
+      }
+      val e2 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      }
+      assert(e1.getMessage.contains("datasource"))
+      assert(e2.getMessage.contains("datasource"))
+    } else {
+      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
+      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
+      assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+        "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
+      assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+        Map("k" -> "v", "kay" -> "vee"))
+    }
+    // set serde properties only
+    sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
+    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+      Map("k" -> "vvv", "kay" -> "vee"))
+    // set things without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
+    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+      Map("k" -> "vvv", "kay" -> "veee"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 4b4f88e..b01f556 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -360,6 +360,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "show_create_table_serde",
     "show_create_table_view",
 
+    // These tests try to change how a table is bucketed, which we don't support
+    "alter4",
+    "sort_merge_join_desc_5",
+    "sort_merge_join_desc_6",
+    "sort_merge_join_desc_7",
+
     // Index commands are not supported
     "drop_index",
     "drop_index_removes_partition_dirs",
@@ -381,7 +387,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "alias_casted_column",
     "alter2",
     "alter3",
-    "alter4",
     "alter5",
     "alter_merge_2",
     "alter_partition_format_loc",
@@ -880,9 +885,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "sort_merge_join_desc_2",
     "sort_merge_join_desc_3",
     "sort_merge_join_desc_4",
-    "sort_merge_join_desc_5",
-    "sort_merge_join_desc_6",
-    "sort_merge_join_desc_7",
     "stats0",
     "stats_aggregator_error_1",
     "stats_empty_partition",

http://git-wip-us.apache.org/repos/asf/spark/blob/45d8cdee/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index d315f39..0cccc22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -94,7 +94,7 @@ private[sql] class HiveSessionCatalog(
     metastoreCatalog.refreshTable(name)
   }
 
-  def invalidateTable(name: TableIdentifier): Unit = {
+  override def invalidateTable(name: TableIdentifier): Unit = {
     metastoreCatalog.invalidateTable(name)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org