You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/04 04:32:43 UTC

spark git commit: [SPARK-13079][SQL] Extend and implement InMemoryCatalog

Repository: spark
Updated Branches:
  refs/heads/master a8e2ba776 -> a64831124


[SPARK-13079][SQL] Extend and implement InMemoryCatalog

This is a step towards consolidating `SQLContext` and `HiveContext`.

This patch extends the existing Catalog API added in #10982 to include methods for handling table partitions. In particular, a partition is identified by `PartitionSpec`, which is just a `Map[String, String]`. The Catalog is still not used by anything yet, but its API is now more or less complete and an implementation is fully tested.

About 200 lines are test code.

Author: Andrew Or <an...@databricks.com>

Closes #11069 from andrewor14/catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6483112
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6483112
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6483112

Branch: refs/heads/master
Commit: a64831124c215f56f124747fa241560c70cf0a36
Parents: a8e2ba7
Author: Andrew Or <an...@databricks.com>
Authored: Wed Feb 3 19:32:41 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 3 19:32:41 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 129 +++++++++---
 .../spark/sql/catalyst/catalog/interface.scala  |  40 +++-
 .../sql/catalyst/catalog/CatalogTestCases.scala | 206 ++++++++++++++++++-
 3 files changed, 328 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6483112/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9e6dfb7..38be61c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.AnalysisException
  * All public methods should be synchronized for thread-safety.
  */
 class InMemoryCatalog extends Catalog {
+  import Catalog._
 
   private class TableDesc(var table: Table) {
-    val partitions = new mutable.HashMap[String, TablePartition]
+    val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
   }
 
   private class DatabaseDesc(var db: Database) {
@@ -46,13 +47,20 @@ class InMemoryCatalog extends Catalog {
   }
 
   private def existsFunction(db: String, funcName: String): Boolean = {
+    assertDbExists(db)
     catalog(db).functions.contains(funcName)
   }
 
   private def existsTable(db: String, table: String): Boolean = {
+    assertDbExists(db)
     catalog(db).tables.contains(table)
   }
 
+  private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
+    assertTableExists(db, table)
+    catalog(db).tables(table).partitions.contains(spec)
+  }
+
   private def assertDbExists(db: String): Unit = {
     if (!catalog.contains(db)) {
       throw new AnalysisException(s"Database $db does not exist")
@@ -60,16 +68,20 @@ class InMemoryCatalog extends Catalog {
   }
 
   private def assertFunctionExists(db: String, funcName: String): Unit = {
-    assertDbExists(db)
     if (!existsFunction(db, funcName)) {
-      throw new AnalysisException(s"Function $funcName does not exists in $db database")
+      throw new AnalysisException(s"Function $funcName does not exist in $db database")
     }
   }
 
   private def assertTableExists(db: String, table: String): Unit = {
-    assertDbExists(db)
     if (!existsTable(db, table)) {
-      throw new AnalysisException(s"Table $table does not exists in $db database")
+      throw new AnalysisException(s"Table $table does not exist in $db database")
+    }
+  }
+
+  private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+    if (!existsPartition(db, table, spec)) {
+      throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
     }
   }
 
@@ -77,9 +89,11 @@ class InMemoryCatalog extends Catalog {
   // Databases
   // --------------------------------------------------------------------------
 
-  override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
+  override def createDatabase(
+      dbDefinition: Database,
+      ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
-      if (!ifNotExists) {
+      if (!ignoreIfExists) {
         throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
       }
     } else {
@@ -88,9 +102,9 @@ class InMemoryCatalog extends Catalog {
   }
 
   override def dropDatabase(
-    db: String,
-    ignoreIfNotExists: Boolean,
-    cascade: Boolean): Unit = synchronized {
+      db: String,
+      ignoreIfNotExists: Boolean,
+      cascade: Boolean): Unit = synchronized {
     if (catalog.contains(db)) {
       if (!cascade) {
         // If cascade is false, make sure the database is empty.
@@ -133,11 +147,13 @@ class InMemoryCatalog extends Catalog {
   // Tables
   // --------------------------------------------------------------------------
 
-  override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
-  : Unit = synchronized {
+  override def createTable(
+      db: String,
+      tableDefinition: Table,
+      ignoreIfExists: Boolean): Unit = synchronized {
     assertDbExists(db)
     if (existsTable(db, tableDefinition.name)) {
-      if (!ifNotExists) {
+      if (!ignoreIfExists) {
         throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
       }
     } else {
@@ -145,8 +161,10 @@ class InMemoryCatalog extends Catalog {
     }
   }
 
-  override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
-  : Unit = synchronized {
+  override def dropTable(
+      db: String,
+      table: String,
+      ignoreIfNotExists: Boolean): Unit = synchronized {
     assertDbExists(db)
     if (existsTable(db, table)) {
       catalog(db).tables.remove(table)
@@ -190,14 +208,67 @@ class InMemoryCatalog extends Catalog {
   // Partitions
   // --------------------------------------------------------------------------
 
-  override def alterPartition(db: String, table: String, part: TablePartition)
-  : Unit = synchronized {
-    throw new UnsupportedOperationException
+  override def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[TablePartition],
+      ignoreIfExists: Boolean): Unit = synchronized {
+    assertTableExists(db, table)
+    val existingParts = catalog(db).tables(table).partitions
+    if (!ignoreIfExists) {
+      val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
+      if (dupSpecs.nonEmpty) {
+        val dupSpecsStr = dupSpecs.mkString("\n===\n")
+        throw new AnalysisException(
+          s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
+      }
+    }
+    parts.foreach { p => existingParts.put(p.spec, p) }
+  }
+
+  override def dropPartitions(
+      db: String,
+      table: String,
+      partSpecs: Seq[PartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = synchronized {
+    assertTableExists(db, table)
+    val existingParts = catalog(db).tables(table).partitions
+    if (!ignoreIfNotExists) {
+      val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
+      if (missingSpecs.nonEmpty) {
+        val missingSpecsStr = missingSpecs.mkString("\n===\n")
+        throw new AnalysisException(
+          s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
+      }
+    }
+    partSpecs.foreach(existingParts.remove)
   }
 
-  override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
-  : Unit = synchronized {
-    throw new UnsupportedOperationException
+  override def alterPartition(
+      db: String,
+      table: String,
+      spec: Map[String, String],
+      newPart: TablePartition): Unit = synchronized {
+    assertPartitionExists(db, table, spec)
+    val existingParts = catalog(db).tables(table).partitions
+    if (spec != newPart.spec) {
+      // Also a change in specs; remove the old one and add the new one back
+      existingParts.remove(spec)
+    }
+    existingParts.put(newPart.spec, newPart)
+  }
+
+  override def getPartition(
+      db: String,
+      table: String,
+      spec: Map[String, String]): TablePartition = synchronized {
+    assertPartitionExists(db, table, spec)
+    catalog(db).tables(table).partitions(spec)
+  }
+
+  override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
+    assertTableExists(db, table)
+    catalog(db).tables(table).partitions.values.toSeq
   }
 
   // --------------------------------------------------------------------------
@@ -205,11 +276,12 @@ class InMemoryCatalog extends Catalog {
   // --------------------------------------------------------------------------
 
   override def createFunction(
-      db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
+      db: String,
+      func: Function,
+      ignoreIfExists: Boolean): Unit = synchronized {
     assertDbExists(db)
-
     if (existsFunction(db, func.name)) {
-      if (!ifNotExists) {
+      if (!ignoreIfExists) {
         throw new AnalysisException(s"Function $func already exists in $db database")
       }
     } else {
@@ -222,14 +294,16 @@ class InMemoryCatalog extends Catalog {
     catalog(db).functions.remove(funcName)
   }
 
-  override def alterFunction(db: String, funcName: String, funcDefinition: Function)
-    : Unit = synchronized {
+  override def alterFunction(
+      db: String,
+      funcName: String,
+      funcDefinition: Function): Unit = synchronized {
     assertFunctionExists(db, funcName)
     if (funcName != funcDefinition.name) {
       // Also a rename; remove the old one and add the new one back
       catalog(db).functions.remove(funcName)
     }
-    catalog(db).functions.put(funcName, funcDefinition)
+    catalog(db).functions.put(funcDefinition.name, funcDefinition)
   }
 
   override def getFunction(db: String, funcName: String): Function = synchronized {
@@ -239,7 +313,6 @@ class InMemoryCatalog extends Catalog {
 
   override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
     assertDbExists(db)
-    val regex = pattern.replaceAll("\\*", ".*").r
     filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6483112/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a6caf91..b4d7dd2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -29,17 +29,15 @@ import org.apache.spark.sql.AnalysisException
  * Implementations should throw [[AnalysisException]] when table or database don't exist.
  */
 abstract class Catalog {
+  import Catalog._
 
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
-  def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
+  def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
 
-  def dropDatabase(
-    db: String,
-    ignoreIfNotExists: Boolean,
-    cascade: Boolean): Unit
+  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
 
   def alterDatabase(db: String, dbDefinition: Database): Unit
 
@@ -71,11 +69,28 @@ abstract class Catalog {
   // Partitions
   // --------------------------------------------------------------------------
 
-  // TODO: need more functions for partitioning.
+  def createPartitions(
+      db: String,
+      table: String,
+      parts: Seq[TablePartition],
+      ignoreIfExists: Boolean): Unit
 
-  def alterPartition(db: String, table: String, part: TablePartition): Unit
+  def dropPartitions(
+      db: String,
+      table: String,
+      parts: Seq[PartitionSpec],
+      ignoreIfNotExists: Boolean): Unit
 
-  def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
+  def alterPartition(
+      db: String,
+      table: String,
+      spec: PartitionSpec,
+      newPart: TablePartition): Unit
+
+  def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+
+  // TODO: support listing by pattern
+  def listPartitions(db: String, table: String): Seq[TablePartition]
 
   // --------------------------------------------------------------------------
   // Functions
@@ -132,11 +147,11 @@ case class Column(
 /**
  * A partition (Hive style) defined in the catalog.
  *
- * @param values values for the partition columns
+ * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
  */
 case class TablePartition(
-  values: Seq[String],
+  spec: Catalog.PartitionSpec,
   storage: StorageFormat
 )
 
@@ -176,3 +191,8 @@ case class Database(
   locationUri: String,
   properties: Map[String, String]
 )
+
+
+object Catalog {
+  type PartitionSpec = Map[String, String]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a6483112/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index ab9d5ac..0d84343 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -27,6 +27,11 @@ import org.apache.spark.sql.AnalysisException
  * Implementations of the [[Catalog]] interface can create test suites by extending this.
  */
 abstract class CatalogTestCases extends SparkFunSuite {
+  private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map.empty[String, String])
+  private val part1 = TablePartition(Map[String, String]("a" -> "1"), storageFormat)
+  private val part2 = TablePartition(Map[String, String]("b" -> "2"), storageFormat)
+  private val part3 = TablePartition(Map[String, String]("c" -> "3"), storageFormat)
+  private val funcClass = "org.apache.spark.myFunc"
 
   protected def newEmptyCatalog(): Catalog
 
@@ -41,16 +46,16 @@ abstract class CatalogTestCases extends SparkFunSuite {
    */
   private def newBasicCatalog(): Catalog = {
     val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb("db1"), ifNotExists = false)
-    catalog.createDatabase(newDb("db2"), ifNotExists = false)
-
+    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
     catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
     catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
     catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
     catalog
   }
 
-  private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
+  private def newFunc(): Function = Function("funcname", funcClass)
 
   private def newDb(name: String = "default"): Database =
     Database(name, name + " description", "uri", Map.empty)
@@ -59,7 +64,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
     Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
       None, None)
 
-  private def newFunc(name: String): Function = Function(name, "class.name")
+  private def newFunc(name: String): Function = Function(name, funcClass)
 
   // --------------------------------------------------------------------------
   // Databases
@@ -67,10 +72,10 @@ abstract class CatalogTestCases extends SparkFunSuite {
 
   test("basic create, drop and list databases") {
     val catalog = newEmptyCatalog()
-    catalog.createDatabase(newDb(), ifNotExists = false)
+    catalog.createDatabase(newDb(), ignoreIfExists = false)
     assert(catalog.listDatabases().toSet == Set("default"))
 
-    catalog.createDatabase(newDb("default2"), ifNotExists = false)
+    catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
     assert(catalog.listDatabases().toSet == Set("default", "default2"))
   }
 
@@ -253,11 +258,194 @@ abstract class CatalogTestCases extends SparkFunSuite {
   // Partitions
   // --------------------------------------------------------------------------
 
-  // TODO: Add tests cases for partitions
+  test("basic create and list partitions") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
+    catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
+    assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+  }
+
+  test("create partitions when database / table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
+    }
+  }
+
+  test("create partitions that already exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
+    }
+    catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
+  }
+
+  test("drop partitions") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+    assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+    val catalog2 = newBasicCatalog()
+    assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+    catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+    assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
+  }
+
+  test("drop partitions when database / table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+    }
+  }
+
+  test("drop partitions that do not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+    }
+    catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+  }
+
+  test("get partition") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
+    assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+    intercept[AnalysisException] {
+      catalog.getPartition("db2", "tbl1", part3.spec)
+    }
+  }
+
+  test("get partition when database / table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getPartition("does_not_exist", "tbl1", part1.spec)
+    }
+    intercept[AnalysisException] {
+      catalog.getPartition("db2", "does_not_exist", part1.spec)
+    }
+  }
+
+  test("alter partitions") {
+    val catalog = newBasicCatalog()
+    val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
+    val partNewSpec = part1.copy(spec = Map("x" -> "10"))
+    // alter but keep spec the same
+    catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
+    assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
+    // alter and change spec
+    catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
+    intercept[AnalysisException] {
+      catalog.getPartition("db2", "tbl2", part1.spec)
+    }
+    assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
+  }
+
+  test("alter partition when database / table does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+    }
+    intercept[AnalysisException] {
+      catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+    }
+  }
 
   // --------------------------------------------------------------------------
   // Functions
   // --------------------------------------------------------------------------
 
-  // TODO: Add tests cases for functions
+  test("basic create and list functions") {
+    val catalog = newEmptyCatalog()
+    catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+    assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+  }
+
+  test("create function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+    }
+  }
+
+  test("create function that already exists") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+    }
+    catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
+  }
+
+  test("drop function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
+    catalog.dropFunction("db2", "func1")
+    assert(catalog.listFunctions("db2", "*").isEmpty)
+  }
+
+  test("drop function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropFunction("does_not_exist", "something")
+    }
+  }
+
+  test("drop function that does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.dropFunction("db2", "does_not_exist")
+    }
+  }
+
+  test("get function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
+    intercept[AnalysisException] {
+      catalog.getFunction("db2", "does_not_exist")
+    }
+  }
+
+  test("get function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.getFunction("does_not_exist", "func1")
+    }
+  }
+
+  test("alter function") {
+    val catalog = newBasicCatalog()
+    assert(catalog.getFunction("db2", "func1").className == funcClass)
+    // alter func but keep name
+    catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
+    assert(catalog.getFunction("db2", "func1").className == "muhaha")
+    // alter func and change name
+    catalog.alterFunction("db2", "func1", newFunc("funcky"))
+    intercept[AnalysisException] {
+      catalog.getFunction("db2", "func1")
+    }
+    assert(catalog.getFunction("db2", "funcky").className == funcClass)
+  }
+
+  test("alter function when database does not exist") {
+    val catalog = newBasicCatalog()
+    intercept[AnalysisException] {
+      catalog.alterFunction("does_not_exist", "func1", newFunc())
+    }
+  }
+
+  test("list functions") {
+    val catalog = newBasicCatalog()
+    catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+    assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
+    assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org