You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/17 02:02:47 UTC

[1/2] spark git commit: [SPARK-13923][SQL] Implement SessionCatalog

Repository: spark
Updated Branches:
  refs/heads/master 92b70576e -> ca9ef86c8


http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
new file mode 100644
index 0000000..e1973ee
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
+
+
+/**
+ * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
+ *
+ * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]].
+ * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
+ * signatures but do not extend a common parent. This is largely by design but
+ * unfortunately leads to very similar test code in two places.
+ */
+class SessionCatalogSuite extends SparkFunSuite {
+  private val utils = new CatalogTestUtils {
+    override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
+    override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
+    override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+  }
+
+  import utils._
+
+  // --------------------------------------------------------------------------
+  // Databases
+  // --------------------------------------------------------------------------
+
+  test("basic create and list databases") {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    assert(catalog.databaseExists("default"))
+    assert(!catalog.databaseExists("testing"))
+    assert(!catalog.databaseExists("testing2"))
+    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+    assert(catalog.databaseExists("testing"))
+    assert(catalog.listDatabases().toSet == Set("default", "testing"))
+    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+    assert(catalog.databaseExists("testing2"))
+    assert(!catalog.databaseExists("does_not_exist"))
+  }
+
+  test("get database when a database exists") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val db1 = catalog.getDatabase("db1")
+    assert(db1.name == "db1")
+    assert(db1.description.contains("db1"))
+  }
+
+  test("get database should throw exception when the database does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.getDatabase("db_that_does_not_exist")
+    }
+  }
+
+  test("list databases without pattern") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+  }
+
+  test("list databases with pattern") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.listDatabases("db").toSet == Set.empty)
+    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases("*1").toSet == Set("db1"))
+    assert(catalog.listDatabases("db2").toSet == Set("db2"))
+  }
+
+  test("drop database") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+    assert(catalog.listDatabases().toSet == Set("default", "db2"))
+  }
+
+  test("drop database when the database is not empty") {
+    // Throw exception if there are functions left
+    val externalCatalog1 = newBasicCatalog()
+    val sessionCatalog1 = new SessionCatalog(externalCatalog1)
+    externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+    externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+    intercept[AnalysisException] {
+      sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+
+    // Throw exception if there are tables left
+    val externalCatalog2 = newBasicCatalog()
+    val sessionCatalog2 = new SessionCatalog(externalCatalog2)
+    externalCatalog2.dropFunction("db2", "func1")
+    intercept[AnalysisException] {
+      sessionCatalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    }
+
+    // When cascade is true, it should drop them
+    val externalCatalog3 = newBasicCatalog()
+    val sessionCatalog3 = new SessionCatalog(externalCatalog3)
+    externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+    assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1"))
+  }
+
+  test("drop database when the database does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+    }
+    catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
+  }
+
+  test("alter database") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val db1 = catalog.getDatabase("db1")
+    // Note: alter properties here because Hive does not support altering other fields
+    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+    val newDb1 = catalog.getDatabase("db1")
+    assert(db1.properties.isEmpty)
+    assert(newDb1.properties.size == 2)
+    assert(newDb1.properties.get("k") == Some("v3"))
+    assert(newDb1.properties.get("good") == Some("true"))
+  }
+
+  test("alter database should throw exception when the database does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.alterDatabase(newDb("does_not_exist"))
+    }
+  }
+
+  test("get/set current database") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.getCurrentDatabase == "default")
+    catalog.setCurrentDatabase("db2")
+    assert(catalog.getCurrentDatabase == "db2")
+    intercept[AnalysisException] {
+      catalog.setCurrentDatabase("deebo")
+    }
+    catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
+    catalog.setCurrentDatabase("deebo")
+    assert(catalog.getCurrentDatabase == "deebo")
+  }
+
+  // --------------------------------------------------------------------------
+  // Tables
+  // --------------------------------------------------------------------------
+
+  test("create table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(externalCatalog.listTables("db1").isEmpty)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    sessionCatalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false)
+    sessionCatalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false)
+    assert(externalCatalog.listTables("db1").toSet == Set("tbl3"))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+    // Create table without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db1")
+    sessionCatalog.createTable(newTable("tbl4"), ignoreIfExists = false)
+    assert(externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4"))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+  }
+
+  test("create table when database does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    // Creating table in non-existent database should always fail
+    intercept[AnalysisException] {
+      catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
+    }
+    // Table already exists
+    intercept[AnalysisException] {
+      catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+    }
+    catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true)
+  }
+
+  test("create temp table") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempTable1 = Range(1, 10, 1, 10, Seq())
+    val tempTable2 = Range(1, 20, 2, 10, Seq())
+    catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+    catalog.createTempTable("tbl2", tempTable2, ignoreIfExists = false)
+    assert(catalog.getTempTable("tbl1") == Some(tempTable1))
+    assert(catalog.getTempTable("tbl2") == Some(tempTable2))
+    assert(catalog.getTempTable("tbl3") == None)
+    // Temporary table already exists
+    intercept[AnalysisException] {
+      catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+    }
+    // Temporary table already exists but we override it
+    catalog.createTempTable("tbl1", tempTable2, ignoreIfExists = true)
+    assert(catalog.getTempTable("tbl1") == Some(tempTable2))
+  }
+
+  test("drop table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+    // Drop table without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false)
+    assert(externalCatalog.listTables("db2").isEmpty)
+  }
+
+  test("drop table when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    // Should always throw exception when the database does not exist
+    intercept[AnalysisException] {
+      catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
+    }
+    // Table does not exist
+    intercept[AnalysisException] {
+      catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
+    }
+    catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
+  }
+
+  test("drop temp table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val tempTable = Range(1, 10, 2, 10, Seq())
+    sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+    sessionCatalog.setCurrentDatabase("db2")
+    assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    // If database is not specified, temp table should be dropped first
+    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+    assert(sessionCatalog.getTempTable("tbl1") == None)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    // If temp table does not exist, the table in the current database should be dropped
+    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+    // If database is specified, temp tables are never dropped
+    sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+    sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+    sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+    assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+  }
+
+  test("rename table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    sessionCatalog.renameTable(
+      TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone", Some("db2")))
+    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+    sessionCatalog.renameTable(
+      TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo", Some("db2")))
+    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
+    // Rename table without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
+    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
+    // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
+    intercept[AnalysisException] {
+      sessionCatalog.renameTable(
+        TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
+    }
+  }
+
+  test("rename table when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.renameTable(
+        TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2", Some("unknown_db")))
+    }
+    intercept[AnalysisException] {
+      catalog.renameTable(
+        TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2", Some("db2")))
+    }
+  }
+
+  test("rename temp table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val tempTable = Range(1, 10, 2, 10, Seq())
+    sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+    sessionCatalog.setCurrentDatabase("db2")
+    assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    // If database is not specified, temp table should be renamed first
+    sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
+    assert(sessionCatalog.getTempTable("tbl1") == None)
+    assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    // If database is specified, temp tables are never renamed
+    sessionCatalog.renameTable(
+      TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2")))
+    assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable))
+    assert(sessionCatalog.getTempTable("tbl4") == None)
+    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
+  }
+
+  test("alter table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val tbl1 = externalCatalog.getTable("db2", "tbl1")
+    sessionCatalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
+    val newTbl1 = externalCatalog.getTable("db2", "tbl1")
+    assert(!tbl1.properties.contains("toh"))
+    assert(newTbl1.properties.size == tbl1.properties.size + 1)
+    assert(newTbl1.properties.get("toh") == Some("frem"))
+    // Alter table without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.alterTable(tbl1.copy(name = TableIdentifier("tbl1")))
+    val newestTbl1 = externalCatalog.getTable("db2", "tbl1")
+    assert(newestTbl1 == tbl1)
+  }
+
+  test("alter table when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.alterTable(newTable("tbl1", "unknown_db"))
+    }
+    intercept[AnalysisException] {
+      catalog.alterTable(newTable("unknown_table", "db2"))
+    }
+  }
+
+  test("get table") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+      == externalCatalog.getTable("db2", "tbl1"))
+    // Get table without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+      == externalCatalog.getTable("db2", "tbl1"))
+  }
+
+  test("get table when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+    }
+    intercept[AnalysisException] {
+      catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+    }
+  }
+
+  test("lookup table relation") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val tempTable1 = Range(1, 10, 1, 10, Seq())
+    val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
+    sessionCatalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false)
+    sessionCatalog.setCurrentDatabase("db2")
+    // If we explicitly specify the database, we'll look up the relation in that database
+    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
+      == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+    // Otherwise, we'll first look up a temporary table with the same name
+    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
+      == SubqueryAlias("tbl1", tempTable1))
+    // Then, if that does not exist, look up the relation in the current database
+    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
+      == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1)))
+  }
+
+  test("lookup table relation with alias") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val alias = "monster"
+    val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+    val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
+    val relationWithAlias =
+      SubqueryAlias(alias,
+        SubqueryAlias("tbl1",
+          CatalogRelation("db2", tableMetadata, Some(alias))))
+    assert(catalog.lookupRelation(
+      TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
+    assert(catalog.lookupRelation(
+      TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
+  }
+
+  test("list tables without pattern") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempTable = Range(1, 10, 2, 10, Seq())
+    catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+    catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+    assert(catalog.listTables("db1").toSet ==
+      Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
+    assert(catalog.listTables("db2").toSet ==
+      Set(TableIdentifier("tbl1"),
+        TableIdentifier("tbl4"),
+        TableIdentifier("tbl1", Some("db2")),
+        TableIdentifier("tbl2", Some("db2"))))
+    intercept[AnalysisException] {
+      catalog.listTables("unknown_db")
+    }
+  }
+
+  test("list tables with pattern") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempTable = Range(1, 10, 2, 10, Seq())
+    catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false)
+    catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false)
+    assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
+    assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
+    assert(catalog.listTables("db2", "tbl*").toSet ==
+      Set(TableIdentifier("tbl1"),
+        TableIdentifier("tbl4"),
+        TableIdentifier("tbl1", Some("db2")),
+        TableIdentifier("tbl2", Some("db2"))))
+    assert(catalog.listTables("db2", "*1").toSet ==
+      Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
+    intercept[AnalysisException] {
+      catalog.listTables("unknown_db")
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  // Partitions
+  // --------------------------------------------------------------------------
+
+  test("basic create and list partitions") {
+    val externalCatalog = newEmptyCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
+    sessionCatalog.createPartitions(
+      TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
+    // Create partitions without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("mydb")
+    sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3)))
+  }
+
+  test("create partitions when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false)
+    }
+  }
+
+  test("create partitions that already exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false)
+    }
+    catalog.createPartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
+  }
+
+  test("drop partitions") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+    sessionCatalog.dropPartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false)
+    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
+    // Drop partitions without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.dropPartitions(
+      TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false)
+    assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
+    // Drop multiple partitions at once
+    sessionCatalog.createPartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
+    assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2)))
+    sessionCatalog.dropPartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+    assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
+  }
+
+  test("drop partitions when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false)
+    }
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false)
+    }
+  }
+
+  test("drop partitions that do not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.dropPartitions(
+        TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false)
+    }
+    catalog.dropPartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true)
+  }
+
+  test("get partition") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.getPartition(
+      TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec)
+    assert(catalog.getPartition(
+      TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec)
+    // Get partition without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec)
+    assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec)
+    // Get non-existent partition
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl2"), part3.spec)
+    }
+  }
+
+  test("get partition when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl1", Some("does_not_exist")), part1.spec)
+    }
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec)
+    }
+  }
+
+  test("rename partitions") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+    val newSpecs = Seq(newPart1.spec, newPart2.spec)
+    catalog.renamePartitions(
+      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs)
+    assert(catalog.getPartition(
+      TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec)
+    assert(catalog.getPartition(
+      TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec)
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+    }
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+    }
+    // Rename partitions without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec))
+    assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec)
+    assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec)
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec)
+    }
+    intercept[AnalysisException] {
+      catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec)
+    }
+  }
+
+  test("rename partitions when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1.spec), Seq(part2.spec))
+    }
+    intercept[AnalysisException] {
+      catalog.renamePartitions(
+        TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec))
+    }
+  }
+
+  test("alter partitions") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val newLocation = newUriForDatabase()
+    // Alter but keep spec the same
+    val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+    val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+    catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(
+      oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+      oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+    val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+    val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+    assert(newPart1.storage.locationUri == Some(newLocation))
+    assert(newPart2.storage.locationUri == Some(newLocation))
+    assert(oldPart1.storage.locationUri != Some(newLocation))
+    assert(oldPart2.storage.locationUri != Some(newLocation))
+    // Alter partitions without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2))
+    val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec)
+    val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec)
+    assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri)
+    assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri)
+    // Alter but change spec, should fail because new partition specs do not exist yet
+    val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+    val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+    intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2))
+    }
+  }
+
+  test("alter partitions when database/table does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1))
+    }
+    intercept[AnalysisException] {
+      catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1))
+    }
+  }
+
+  test("list partitions") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
+    // List partitions without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2))
+  }
+
+  // --------------------------------------------------------------------------
+  // Functions
+  // --------------------------------------------------------------------------
+
+  test("basic create and list functions") {
+    val externalCatalog = newEmptyCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+    sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+    assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+    // Create function without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("mydb")
+    sessionCatalog.createFunction(newFunc("myfunc2"))
+    assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
+  }
+
+  test("create function when database does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+    }
+  }
+
+  test("create function that already exists") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.createFunction(newFunc("func1", Some("db2")))
+    }
+  }
+
+  test("create temp function") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempFunc1 = newFunc("temp1")
+    val tempFunc2 = newFunc("temp2")
+    catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+    catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+    assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
+    assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
+    assert(catalog.getTempFunction("temp3") == None)
+    // Temporary function already exists
+    intercept[AnalysisException] {
+      catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+    }
+    // Temporary function is overridden
+    val tempFunc3 = tempFunc1.copy(className = "something else")
+    catalog.createTempFunction(tempFunc3, ignoreIfExists = true)
+    assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
+  }
+
+  test("drop function") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+    sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+    assert(externalCatalog.listFunctions("db2", "*").isEmpty)
+    // Drop function without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
+    sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+    assert(externalCatalog.listFunctions("db2", "*").isEmpty)
+  }
+
+  test("drop function when database/function does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+    }
+    intercept[AnalysisException] {
+      catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+    }
+  }
+
+  test("drop temp function") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempFunc = newFunc("func1")
+    catalog.createTempFunction(tempFunc, ignoreIfExists = false)
+    assert(catalog.getTempFunction("func1") == Some(tempFunc))
+    catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+    assert(catalog.getTempFunction("func1") == None)
+    intercept[AnalysisException] {
+      catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+    }
+    catalog.dropTempFunction("func1", ignoreIfNotExists = true)
+  }
+
+  test("get function") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass)
+    assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+    // Get function without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+  }
+
+  test("get function when database/function does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+    }
+    intercept[AnalysisException] {
+      catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+    }
+  }
+
+  test("get temp function") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val metastoreFunc = externalCatalog.getFunction("db2", "func1")
+    val tempFunc = newFunc("func1").copy(className = "something weird")
+    sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
+    sessionCatalog.setCurrentDatabase("db2")
+    // If a database is specified, we'll always return the function in that database
+    assert(sessionCatalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == metastoreFunc)
+    // If no database is specified, we'll first return temporary functions
+    assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == tempFunc)
+    // Then, if no such temporary function exist, check the current database
+    sessionCatalog.dropTempFunction("func1", ignoreIfNotExists = false)
+    assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == metastoreFunc)
+  }
+
+  test("rename function") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val newName = "funcky"
+    assert(sessionCatalog.getFunction(
+      FunctionIdentifier("func1", Some("db2"))) == newFunc("func1", Some("db2")))
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+    sessionCatalog.renameFunction(
+      FunctionIdentifier("func1", Some("db2")), FunctionIdentifier(newName, Some("db2")))
+    assert(sessionCatalog.getFunction(
+      FunctionIdentifier(newName, Some("db2"))) == newFunc(newName, Some("db2")))
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set(newName))
+    // Rename function without explicitly specifying database
+    sessionCatalog.setCurrentDatabase("db2")
+    sessionCatalog.renameFunction(FunctionIdentifier(newName), FunctionIdentifier("func1"))
+    assert(sessionCatalog.getFunction(
+      FunctionIdentifier("func1")) == newFunc("func1", Some("db2")))
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
+    // Renaming "db2.func1" to "db1.func2" should fail because databases don't match
+    intercept[AnalysisException] {
+      sessionCatalog.renameFunction(
+        FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func2", Some("db1")))
+    }
+  }
+
+  test("rename function when database/function does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.renameFunction(
+        FunctionIdentifier("func1", Some("does_not_exist")),
+        FunctionIdentifier("func5", Some("does_not_exist")))
+    }
+    intercept[AnalysisException] {
+      catalog.renameFunction(
+        FunctionIdentifier("does_not_exist", Some("db2")),
+        FunctionIdentifier("x", Some("db2")))
+    }
+  }
+
+  test("rename temp function") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val tempFunc = newFunc("func1").copy(className = "something weird")
+    sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
+    sessionCatalog.setCurrentDatabase("db2")
+    // If a database is specified, we'll always rename the function in that database
+    sessionCatalog.renameFunction(
+      FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func3", Some("db2")))
+    assert(sessionCatalog.getTempFunction("func1") == Some(tempFunc))
+    assert(sessionCatalog.getTempFunction("func3") == None)
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3"))
+    // If no database is specified, we'll first rename temporary functions
+    sessionCatalog.createFunction(newFunc("func1", Some("db2")))
+    sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4"))
+    assert(sessionCatalog.getTempFunction("func4") ==
+      Some(tempFunc.copy(name = FunctionIdentifier("func4"))))
+    assert(sessionCatalog.getTempFunction("func1") == None)
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3"))
+    // Then, if no such temporary function exist, rename the function in the current database
+    sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func5"))
+    assert(sessionCatalog.getTempFunction("func5") == None)
+    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3", "func5"))
+  }
+
+  test("alter function") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == funcClass)
+    catalog.alterFunction(newFunc("func1", Some("db2")).copy(className = "muhaha"))
+    assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == "muhaha")
+    // Alter function without explicitly specifying database
+    catalog.setCurrentDatabase("db2")
+    catalog.alterFunction(newFunc("func1").copy(className = "derpy"))
+    assert(catalog.getFunction(FunctionIdentifier("func1")).className == "derpy")
+  }
+
+  test("alter function when database/function does not exist") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    intercept[AnalysisException] {
+      catalog.alterFunction(newFunc("func5", Some("does_not_exist")))
+    }
+    intercept[AnalysisException] {
+      catalog.alterFunction(newFunc("funcky", Some("db2")))
+    }
+  }
+
+  test("list functions") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempFunc1 = newFunc("func1").copy(className = "march")
+    val tempFunc2 = newFunc("yes_me").copy(className = "april")
+    catalog.createFunction(newFunc("func2", Some("db2")))
+    catalog.createFunction(newFunc("not_me", Some("db2")))
+    catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+    catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+    assert(catalog.listFunctions("db1", "*").toSet ==
+      Set(FunctionIdentifier("func1"),
+        FunctionIdentifier("yes_me")))
+    assert(catalog.listFunctions("db2", "*").toSet ==
+      Set(FunctionIdentifier("func1"),
+        FunctionIdentifier("yes_me"),
+        FunctionIdentifier("func1", Some("db2")),
+        FunctionIdentifier("func2", Some("db2")),
+        FunctionIdentifier("not_me", Some("db2"))))
+    assert(catalog.listFunctions("db2", "func*").toSet ==
+      Set(FunctionIdentifier("func1"),
+        FunctionIdentifier("func1", Some("db2")),
+        FunctionIdentifier("func2", Some("db2"))))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
index 5185e9a..439501f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala
@@ -24,6 +24,7 @@ import org.apache.thrift.TException
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.hive.client.HiveClient
@@ -73,10 +74,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
   }
 
   private def requireDbMatches(db: String, table: CatalogTable): Unit = {
-    if (table.specifiedDatabase != Some(db)) {
+    if (table.name.database != Some(db)) {
       throw new AnalysisException(
         s"Provided database $db does not much the one specified in the " +
-        s"table definition (${table.specifiedDatabase.getOrElse("n/a")})")
+        s"table definition (${table.name.database.getOrElse("n/a")})")
     }
   }
 
@@ -160,7 +161,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
   }
 
   override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
-    val newTable = client.getTable(db, oldName).copy(name = newName)
+    val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db)))
     client.alterTable(oldName, newTable)
   }
 
@@ -173,7 +174,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
    */
   override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
     requireDbMatches(db, tableDefinition)
-    requireTableExists(db, tableDefinition.name)
+    requireTableExists(db, tableDefinition.name.table)
     client.alterTable(tableDefinition)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c70510b..b6c7869 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -118,8 +118,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
   private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = {
     QualifiedTableName(
-      t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
-      t.name.toLowerCase)
+      t.name.database.getOrElse(client.currentDatabase).toLowerCase,
+      t.name.table.toLowerCase)
   }
 
   /** A cache of Spark SQL data source tables that have been accessed. */
@@ -293,8 +293,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
     def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
       CatalogTable(
-        specifiedDatabase = Option(dbName),
-        name = tblName,
+        name = TableIdentifier(tblName, Option(dbName)),
         tableType = tableType,
         schema = Nil,
         storage = CatalogStorageFormat(
@@ -314,8 +313,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
       assert(relation.partitionSchema.isEmpty)
 
       CatalogTable(
-        specifiedDatabase = Option(dbName),
-        name = tblName,
+        name = TableIdentifier(tblName, Option(dbName)),
         tableType = tableType,
         storage = CatalogStorageFormat(
           locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
@@ -432,7 +430,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
       alias match {
         // because hive use things like `_c0` to build the expanded text
         // currently we cannot support view from "create view v1(c1) as ..."
-        case None => SubqueryAlias(table.name, hive.parseSql(viewText))
+        case None => SubqueryAlias(table.name.table, hive.parseSql(viewText))
         case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
       }
     } else {
@@ -618,9 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
         execution.CreateViewAsSelect(
-          table.copy(
-            specifiedDatabase = Some(dbName),
-            name = tblName),
+          table.copy(name = TableIdentifier(tblName, Some(dbName))),
           child,
           allowExisting,
           replace)
@@ -642,7 +638,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
         if (hive.convertCTAS && table.storage.serde.isEmpty) {
           // Do the conversion when spark.sql.hive.convertCTAS is true and the query
           // does not specify any storage format (file format and storage handler).
-          if (table.specifiedDatabase.isDefined) {
+          if (table.name.database.isDefined) {
             throw new AnalysisException(
               "Cannot specify database name in a CTAS statement " +
                 "when spark.sql.hive.convertCTAS is set to true.")
@@ -650,7 +646,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
 
           val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
           CreateTableUsingAsSelect(
-            TableIdentifier(desc.name),
+            TableIdentifier(desc.name.table),
             conf.defaultDataSourceName,
             temporary = false,
             Array.empty[String],
@@ -671,9 +667,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
           val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
           execution.CreateTableAsSelect(
-            desc.copy(
-              specifiedDatabase = Some(dbName),
-              name = tblName),
+            desc.copy(name = TableIdentifier(tblName, Some(dbName))),
             child,
             allowExisting)
         }
@@ -824,7 +818,7 @@ private[hive] case class MetastoreRelation(
     // We start by constructing an API table as Hive performs several important transformations
     // internally when converting an API table to a QL table.
     val tTable = new org.apache.hadoop.hive.metastore.api.Table()
-    tTable.setTableName(table.name)
+    tTable.setTableName(table.name.table)
     tTable.setDbName(table.database)
 
     val tableParameters = new java.util.HashMap[String, String]()

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 739fbaf..00fc8af 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect(
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
   override lazy val resolved: Boolean =
-    tableDesc.specifiedDatabase.isDefined &&
+    tableDesc.name.database.isDefined &&
     tableDesc.schema.nonEmpty &&
     tableDesc.storage.serde.isDefined &&
     tableDesc.storage.inputFormat.isDefined &&
@@ -185,13 +185,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
       properties: Map[String, String],
       allowExist: Boolean,
       replace: Boolean): CreateViewAsSelect = {
-    val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
-
+    val tableIdentifier = extractTableIdent(viewNameParts)
     val originalText = query.source
-
     val tableDesc = CatalogTable(
-      specifiedDatabase = dbName,
-      name = viewName,
+      name = tableIdentifier,
       tableType = CatalogTableType.VIRTUAL_VIEW,
       schema = schema,
       storage = CatalogStorageFormat(
@@ -356,12 +353,11 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging
               "TOK_TABLELOCATION",
               "TOK_TABLEPROPERTIES"),
             children)
-        val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
+        val tableIdentifier = extractTableIdent(tableNameParts)
 
         // TODO add bucket support
         var tableDesc: CatalogTable = CatalogTable(
-          specifiedDatabase = dbName,
-          name = tblName,
+          name = tableIdentifier,
           tableType =
             if (externalTable.isDefined) {
               CatalogTableType.EXTERNAL_TABLE

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index b32aff2..d214e52 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -91,7 +91,7 @@ private[hive] trait HiveClient {
   def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
 
   /** Alter a table whose name matches the one specified in `table`, assuming it exists. */
-  final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table)
+  final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table)
 
   /** Updates the given table with new metadata, optionally renaming the table. */
   def alterTable(tableName: String, table: CatalogTable): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c108750..3040ec9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
@@ -298,8 +299,7 @@ private[hive] class HiveClientImpl(
     logDebug(s"Looking up $dbName.$tableName")
     Option(client.getTable(dbName, tableName, false)).map { h =>
       CatalogTable(
-        specifiedDatabase = Option(h.getDbName),
-        name = h.getTableName,
+        name = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
           case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE
           case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE
@@ -545,13 +545,13 @@ private[hive] class HiveClientImpl(
   }
 
   override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
-    val catalogFunc = getFunction(db, oldName).copy(name = newName)
+    val catalogFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
     val hiveFunc = toHiveFunction(catalogFunc, db)
     client.alterFunction(db, oldName, hiveFunc)
   }
 
   override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
-    client.alterFunction(db, func.name, toHiveFunction(func, db))
+    client.alterFunction(db, func.name.funcName, toHiveFunction(func, db))
   }
 
   override def getFunctionOption(
@@ -612,7 +612,7 @@ private[hive] class HiveClientImpl(
 
   private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
     new HiveFunction(
-      f.name,
+      f.name.funcName,
       db,
       f.className,
       null,
@@ -623,7 +623,8 @@ private[hive] class HiveClientImpl(
   }
 
   private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
-    new CatalogFunction(hf.getFunctionName, hf.getClassName)
+    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
+    new CatalogFunction(name, hf.getClassName)
   }
 
   private def toHiveColumn(c: CatalogColumn): FieldSchema = {
@@ -639,7 +640,7 @@ private[hive] class HiveClientImpl(
   }
 
   private def toHiveTable(table: CatalogTable): HiveTable = {
-    val hiveTable = new HiveTable(table.database, table.name)
+    val hiveTable = new HiveTable(table.database, table.name.table)
     hiveTable.setTableType(table.tableType match {
       case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE
       case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 91425d1..391e297 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -38,7 +38,7 @@ case class CreateTableAsSelect(
     allowExisting: Boolean)
   extends RunnableCommand {
 
-  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+  private val tableIdentifier = tableDesc.name
 
   override def children: Seq[LogicalPlan] = Seq(query)
 
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
   }
 
   override def argString: String = {
-    s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
+    s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]"
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 6c2b88e..8a1cf2c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect(
   assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
   assert(tableDesc.viewText.isDefined)
 
-  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+  private val tableIdentifier = tableDesc.name
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
@@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect(
     }
 
     val viewText = tableDesc.viewText.get
-    val viewName = quote(tableDesc.name)
+    val viewName = quote(tableDesc.name.table)
     s"SELECT $viewOutput FROM ($viewText) $viewName"
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
index 2809f94..0dc4fea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -36,15 +36,12 @@ class HiveCatalogSuite extends CatalogTestCases {
       sparkConf = new SparkConf()).createClient()
   }
 
-  protected override val tableInputFormat: String =
-    "org.apache.hadoop.mapred.SequenceFileInputFormat"
-  protected override val tableOutputFormat: String =
-    "org.apache.hadoop.mapred.SequenceFileOutputFormat"
-
-  protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+  protected override val utils: CatalogTestUtils = new CatalogTestUtils {
+    override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
+    override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
+    override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client)
+  }
 
   protected override def resetState(): Unit = client.reset()
 
-  protected override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client)
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 626550f..1c775db 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     val (desc, exists) = extractTableDesc(s1)
     assert(exists)
-    assert(desc.specifiedDatabase == Some("mydb"))
-    assert(desc.name == "page_view")
+    assert(desc.name.database == Some("mydb"))
+    assert(desc.name.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
@@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     val (desc, exists) = extractTableDesc(s2)
     assert(exists)
-    assert(desc.specifiedDatabase == Some("mydb"))
-    assert(desc.name == "page_view")
+    assert(desc.name.database == Some("mydb"))
+    assert(desc.name.table == "page_view")
     assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE)
     assert(desc.storage.locationUri == Some("/user/external/page_view"))
     assert(desc.schema ==
@@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
     val s3 = """CREATE TABLE page_view AS SELECT * FROM src"""
     val (desc, exists) = extractTableDesc(s3)
     assert(exists == false)
-    assert(desc.specifiedDatabase == None)
-    assert(desc.name == "page_view")
+    assert(desc.name.database == None)
+    assert(desc.name.table == "page_view")
     assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
     assert(desc.storage.locationUri == None)
     assert(desc.schema == Seq.empty[CatalogColumn])
@@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
                |   ORDER BY key, value""".stripMargin
     val (desc, exists) = extractTableDesc(s5)
     assert(exists == false)
-    assert(desc.specifiedDatabase == None)
-    assert(desc.name == "ctas2")
+    assert(desc.name.database == None)
+    assert(desc.name.table == "ctas2")
     assert(desc.tableType == CatalogTableType.MANAGED_TABLE)
     assert(desc.storage.locationUri == None)
     assert(desc.schema == Seq.empty[CatalogColumn])

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 81420fe..a80c35c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -719,8 +719,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     withTable(tableName) {
       val schema = StructType(StructField("int", IntegerType, true) :: Nil)
       val hiveTable = CatalogTable(
-        specifiedDatabase = Some("default"),
-        name = tableName,
+        name = TableIdentifier(tableName, Some("default")),
         tableType = CatalogTableType.MANAGED_TABLE,
         schema = Seq.empty,
         storage = CatalogStorageFormat(

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 6292f6c..3d54da1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
@@ -129,8 +130,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: createTable") {
       val table =
         CatalogTable(
-          specifiedDatabase = Option("default"),
-          name = "src",
+          name = TableIdentifier("src", Some("default")),
           tableType = CatalogTableType.MANAGED_TABLE,
           schema = Seq(CatalogColumn("key", "int")),
           storage = CatalogStorageFormat(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-13923][SQL] Implement SessionCatalog

Posted by yh...@apache.org.
[SPARK-13923][SQL] Implement SessionCatalog

## What changes were proposed in this pull request?

As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`.

A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands.

## How was this patch tested?

800+ lines of tests in `SessionCatalogSuite`.

Author: Andrew Or <an...@databricks.com>

Closes #11750 from andrewor14/temp-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9ef86c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9ef86c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9ef86c

Branch: refs/heads/master
Commit: ca9ef86c84ee84263f437a979017898f4bed0feb
Parents: 92b7057
Author: Andrew Or <an...@databricks.com>
Authored: Wed Mar 16 18:02:43 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 18:02:43 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/TableIdentifier.scala    |  35 -
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  51 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   | 469 ++++++++++
 .../spark/sql/catalyst/catalog/interface.scala  |  29 +-
 .../apache/spark/sql/catalyst/identifiers.scala |  68 ++
 .../sql/catalyst/catalog/CatalogTestCases.scala | 171 ++--
 .../catalyst/catalog/InMemoryCatalogSuite.scala |   9 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  | 864 +++++++++++++++++++
 .../org/apache/spark/sql/hive/HiveCatalog.scala |   9 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  26 +-
 .../org/apache/spark/sql/hive/HiveQl.scala      |  14 +-
 .../spark/sql/hive/client/HiveClient.scala      |   2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  15 +-
 .../hive/execution/CreateTableAsSelect.scala    |   4 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |   4 +-
 .../spark/sql/hive/HiveCatalogSuite.scala       |  13 +-
 .../org/apache/spark/sql/hive/HiveQlSuite.scala |  16 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   3 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 +-
 19 files changed, 1604 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
deleted file mode 100644
index 4d4e4de..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-/**
- * Identifies a `table` in `database`.  If `database` is not defined, the current database is used.
- */
-private[sql] case class TableIdentifier(table: String, database: Option[String]) {
-  def this(table: String) = this(table, None)
-
-  override def toString: String = quotedString
-
-  def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
-
-  def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
-}
-
-private[sql] object TableIdentifier {
-  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f3fa795..7ead1dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
 import scala.collection.mutable
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 
 
 /**
@@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog {
 
   private def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!existsFunction(db, funcName)) {
-      throw new AnalysisException(s"Function $funcName does not exist in $db database")
+      throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
     }
   }
 
   private def requireTableExists(db: String, table: String): Unit = {
     if (!existsTable(db, table)) {
-      throw new AnalysisException(s"Table $table does not exist in $db database")
+      throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
     }
   }
 
   private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
     if (!existsPartition(db, table, spec)) {
-      throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
+      throw new AnalysisException(
+        s"Partition does not exist in database '$db' table '$table': '$spec'")
     }
   }
 
@@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog {
       ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
       if (!ignoreIfExists) {
-        throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
+        throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.")
       }
     } else {
       catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
@@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog {
       if (!cascade) {
         // If cascade is false, make sure the database is empty.
         if (catalog(db).tables.nonEmpty) {
-          throw new AnalysisException(s"Database $db is not empty. One or more tables exist.")
+          throw new AnalysisException(s"Database '$db' is not empty. One or more tables exist.")
         }
         if (catalog(db).functions.nonEmpty) {
-          throw new AnalysisException(s"Database $db is not empty. One or more functions exist.")
+          throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.")
         }
       }
       // Remove the database.
       catalog.remove(db)
     } else {
       if (!ignoreIfNotExists) {
-        throw new AnalysisException(s"Database $db does not exist")
+        throw new AnalysisException(s"Database '$db' does not exist")
       }
     }
   }
@@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog {
       tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = synchronized {
     requireDbExists(db)
-    if (existsTable(db, tableDefinition.name)) {
+    val table = tableDefinition.name.table
+    if (existsTable(db, table)) {
       if (!ignoreIfExists) {
-        throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
+        throw new AnalysisException(s"Table '$table' already exists in database '$db'")
       }
     } else {
-      catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition))
+      catalog(db).tables.put(table, new TableDesc(tableDefinition))
     }
   }
 
@@ -174,7 +177,7 @@ class InMemoryCatalog extends ExternalCatalog {
       catalog(db).tables.remove(table)
     } else {
       if (!ignoreIfNotExists) {
-        throw new AnalysisException(s"Table $table does not exist in $db database")
+        throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
       }
     }
   }
@@ -182,14 +185,14 @@ class InMemoryCatalog extends ExternalCatalog {
   override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
     requireTableExists(db, oldName)
     val oldDesc = catalog(db).tables(oldName)
-    oldDesc.table = oldDesc.table.copy(name = newName)
+    oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
 
   override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
-    requireTableExists(db, tableDefinition.name)
-    catalog(db).tables(tableDefinition.name).table = tableDefinition
+    requireTableExists(db, tableDefinition.name.table)
+    catalog(db).tables(tableDefinition.name.table).table = tableDefinition
   }
 
   override def getTable(db: String, table: String): CatalogTable = synchronized {
@@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog {
       val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
       if (dupSpecs.nonEmpty) {
         val dupSpecsStr = dupSpecs.mkString("\n===\n")
-        throw new AnalysisException(
-          s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
+        throw new AnalysisException("The following partitions already exist in database " +
+          s"'$db' table '$table':\n$dupSpecsStr")
       }
     }
     parts.foreach { p => existingParts.put(p.spec, p) }
@@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog {
       val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
       if (missingSpecs.nonEmpty) {
         val missingSpecsStr = missingSpecs.mkString("\n===\n")
-        throw new AnalysisException(
-          s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
+        throw new AnalysisException("The following partitions do not exist in database " +
+          s"'$db' table '$table':\n$missingSpecsStr")
       }
     }
     partSpecs.foreach(existingParts.remove)
@@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
     requireDbExists(db)
-    if (existsFunction(db, func.name)) {
-      throw new AnalysisException(s"Function $func already exists in $db database")
+    if (existsFunction(db, func.name.funcName)) {
+      throw new AnalysisException(s"Function '$func' already exists in '$db' database")
     } else {
-      catalog(db).functions.put(func.name, func)
+      catalog(db).functions.put(func.name.funcName, func)
     }
   }
 
@@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
     requireFunctionExists(db, oldName)
-    val newFunc = getFunction(db, oldName).copy(name = newName)
+    val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
     catalog(db).functions.remove(oldName)
     catalog(db).functions.put(newName, newFunc)
   }
 
   override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
-    requireFunctionExists(db, funcDefinition.name)
-    catalog(db).functions.put(funcDefinition.name, funcDefinition)
+    requireFunctionExists(db, funcDefinition.name.funcName)
+    catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
   }
 
   override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
new file mode 100644
index 0000000..4dec042
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+
+
+/**
+ * An internal catalog that is used by a Spark Session. This internal catalog serves as a
+ * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
+ * tables and functions of the Spark Session that it belongs to.
+ */
+class SessionCatalog(externalCatalog: ExternalCatalog) {
+  import ExternalCatalog._
+
+  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+  private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
+
+  // Note: we track current database here because certain operations do not explicitly
+  // specify the database (e.g. DROP TABLE my_table). In these cases we must first
+  // check whether the temporary table or function exists, then, if not, operate on
+  // the corresponding item in the current database.
+  private[this] var currentDb = "default"
+
+  // ----------------------------------------------------------------------------
+  // Databases
+  // ----------------------------------------------------------------------------
+  // All methods in this category interact directly with the underlying catalog.
+  // ----------------------------------------------------------------------------
+
+  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+  }
+
+  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
+  }
+
+  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
+    externalCatalog.alterDatabase(dbDefinition)
+  }
+
+  def getDatabase(db: String): CatalogDatabase = {
+    externalCatalog.getDatabase(db)
+  }
+
+  def databaseExists(db: String): Boolean = {
+    externalCatalog.databaseExists(db)
+  }
+
+  def listDatabases(): Seq[String] = {
+    externalCatalog.listDatabases()
+  }
+
+  def listDatabases(pattern: String): Seq[String] = {
+    externalCatalog.listDatabases(pattern)
+  }
+
+  def getCurrentDatabase: String = currentDb
+
+  def setCurrentDatabase(db: String): Unit = {
+    if (!databaseExists(db)) {
+      throw new AnalysisException(s"cannot set current database to non-existent '$db'")
+    }
+    currentDb = db
+  }
+
+  // ----------------------------------------------------------------------------
+  // Tables
+  // ----------------------------------------------------------------------------
+  // There are two kinds of tables, temporary tables and metastore tables.
+  // Temporary tables are isolated across sessions and do not belong to any
+  // particular database. Metastore tables can be used across multiple
+  // sessions as their metadata is persisted in the underlying catalog.
+  // ----------------------------------------------------------------------------
+
+  // ----------------------------------------------------
+  // | Methods that interact with metastore tables only |
+  // ----------------------------------------------------
+
+  /**
+   * Create a metastore table in the database specified in `tableDefinition`.
+   * If no such database is specified, create it in the current database.
+   */
+  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+    val db = tableDefinition.name.database.getOrElse(currentDb)
+    val newTableDefinition = tableDefinition.copy(
+      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
+  }
+
+  /**
+   * Alter the metadata of an existing metastore table identified by `tableDefinition`.
+   *
+   * If no database is specified in `tableDefinition`, assume the table is in the
+   * current database.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
+   */
+  def alterTable(tableDefinition: CatalogTable): Unit = {
+    val db = tableDefinition.name.database.getOrElse(currentDb)
+    val newTableDefinition = tableDefinition.copy(
+      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    externalCatalog.alterTable(db, newTableDefinition)
+  }
+
+  /**
+   * Retrieve the metadata of an existing metastore table.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def getTable(name: TableIdentifier): CatalogTable = {
+    val db = name.database.getOrElse(currentDb)
+    externalCatalog.getTable(db, name.table)
+  }
+
+  // -------------------------------------------------------------
+  // | Methods that interact with temporary and metastore tables |
+  // -------------------------------------------------------------
+
+  /**
+   * Create a temporary table.
+   */
+  def createTempTable(
+      name: String,
+      tableDefinition: LogicalPlan,
+      ignoreIfExists: Boolean): Unit = {
+    if (tempTables.containsKey(name) && !ignoreIfExists) {
+      throw new AnalysisException(s"Temporary table '$name' already exists.")
+    }
+    tempTables.put(name, tableDefinition)
+  }
+
+  /**
+   * Rename a table.
+   *
+   * If a database is specified in `oldName`, this will rename the table in that database.
+   * If no database is specified, this will first attempt to rename a temporary table with
+   * the same name, then, if that does not exist, rename the table in the current database.
+   *
+   * This assumes the database specified in `oldName` matches the one specified in `newName`.
+   */
+  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
+    if (oldName.database != newName.database) {
+      throw new AnalysisException("rename does not support moving tables across databases")
+    }
+    val db = oldName.database.getOrElse(currentDb)
+    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
+      externalCatalog.renameTable(db, oldName.table, newName.table)
+    } else {
+      val table = tempTables.remove(oldName.table)
+      tempTables.put(newName.table, table)
+    }
+  }
+
+  /**
+   * Drop a table.
+   *
+   * If a database is specified in `name`, this will drop the table from that database.
+   * If no database is specified, this will first attempt to drop a temporary table with
+   * the same name, then, if that does not exist, drop the table from the current database.
+   */
+  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
+    val db = name.database.getOrElse(currentDb)
+    if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+      externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
+    } else {
+      tempTables.remove(name.table)
+    }
+  }
+
+  /**
+   * Return a [[LogicalPlan]] that represents the given table.
+   *
+   * If a database is specified in `name`, this will return the table from that database.
+   * If no database is specified, this will first attempt to return a temporary table with
+   * the same name, then, if that does not exist, return the table from the current database.
+   */
+  def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
+    val db = name.database.getOrElse(currentDb)
+    val relation =
+      if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+        val metadata = externalCatalog.getTable(db, name.table)
+        CatalogRelation(db, metadata, alias)
+      } else {
+        tempTables.get(name.table)
+      }
+    val tableWithQualifiers = SubqueryAlias(name.table, relation)
+    // If an alias was specified by the lookup, wrap the plan in a subquery so that
+    // attributes are properly qualified with this alias.
+    alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
+  }
+
+  /**
+   * List all tables in the specified database, including temporary tables.
+   */
+  def listTables(db: String): Seq[TableIdentifier] = {
+    val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) }
+    val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
+    dbTables ++ _tempTables
+  }
+
+  /**
+   * List all matching tables in the specified database, including temporary tables.
+   */
+  def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
+    val dbTables =
+      externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
+    val regex = pattern.replaceAll("\\*", ".*").r
+    val _tempTables = tempTables.keys().asScala
+      .filter { t => regex.pattern.matcher(t).matches() }
+      .map { t => TableIdentifier(t) }
+    dbTables ++ _tempTables
+  }
+
+  /**
+   * Return a temporary table exactly as it was stored.
+   * For testing only.
+   */
+  private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
+    Option(tempTables.get(name))
+  }
+
+  // ----------------------------------------------------------------------------
+  // Partitions
+  // ----------------------------------------------------------------------------
+  // All methods in this category interact directly with the underlying catalog.
+  // These methods are concerned with only metastore tables.
+  // ----------------------------------------------------------------------------
+
+  // TODO: We need to figure out how these methods interact with our data source
+  // tables. For such tables, we do not store values of partitioning columns in
+  // the metastore. For now, partition values of a data source table will be
+  // automatically discovered when we load the table.
+
+  /**
+   * Create partitions in an existing table, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists)
+  }
+
+  /**
+   * Drop partitions from a table, assuming they exist.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def dropPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists)
+  }
+
+  /**
+   * Override the specs of one or many existing table partitions, assuming they exist.
+   *
+   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def renamePartitions(
+      tableName: TableIdentifier,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
+  }
+
+  /**
+   * Alter one or many table partitions whose specs that match those specified in `parts`,
+   * assuming the partitions exist.
+   *
+   * If no database is specified, assume the table is in the current database.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
+   */
+  def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.alterPartitions(db, tableName.table, parts)
+  }
+
+  /**
+   * Retrieve the metadata of a table partition, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.getPartition(db, tableName.table, spec)
+  }
+
+  /**
+   * List all partitions in a table, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.listPartitions(db, tableName.table)
+  }
+
+  // ----------------------------------------------------------------------------
+  // Functions
+  // ----------------------------------------------------------------------------
+  // There are two kinds of functions, temporary functions and metastore
+  // functions (permanent UDFs). Temporary functions are isolated across
+  // sessions. Metastore functions can be used across multiple sessions as
+  // their metadata is persisted in the underlying catalog.
+  // ----------------------------------------------------------------------------
+
+  // -------------------------------------------------------
+  // | Methods that interact with metastore functions only |
+  // -------------------------------------------------------
+
+  /**
+   * Create a metastore function in the database specified in `funcDefinition`.
+   * If no such database is specified, create it in the current database.
+   */
+  def createFunction(funcDefinition: CatalogFunction): Unit = {
+    val db = funcDefinition.name.database.getOrElse(currentDb)
+    val newFuncDefinition = funcDefinition.copy(
+      name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+    externalCatalog.createFunction(db, newFuncDefinition)
+  }
+
+  /**
+   * Drop a metastore function.
+   * If no database is specified, assume the function is in the current database.
+   */
+  def dropFunction(name: FunctionIdentifier): Unit = {
+    val db = name.database.getOrElse(currentDb)
+    externalCatalog.dropFunction(db, name.funcName)
+  }
+
+  /**
+   * Alter a metastore function whose name that matches the one specified in `funcDefinition`.
+   *
+   * If no database is specified in `funcDefinition`, assume the function is in the
+   * current database.
+   *
+   * Note: If the underlying implementation does not support altering a certain field,
+   * this becomes a no-op.
+   */
+  def alterFunction(funcDefinition: CatalogFunction): Unit = {
+    val db = funcDefinition.name.database.getOrElse(currentDb)
+    val newFuncDefinition = funcDefinition.copy(
+      name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+    externalCatalog.alterFunction(db, newFuncDefinition)
+  }
+
+  // ----------------------------------------------------------------
+  // | Methods that interact with temporary and metastore functions |
+  // ----------------------------------------------------------------
+
+  /**
+   * Create a temporary function.
+   * This assumes no database is specified in `funcDefinition`.
+   */
+  def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
+    require(funcDefinition.name.database.isEmpty,
+      "attempted to create a temporary function while specifying a database")
+    val name = funcDefinition.name.funcName
+    if (tempFunctions.containsKey(name) && !ignoreIfExists) {
+      throw new AnalysisException(s"Temporary function '$name' already exists.")
+    }
+    tempFunctions.put(name, funcDefinition)
+  }
+
+  /**
+   * Drop a temporary function.
+   */
+  // TODO: The reason that we distinguish dropFunction and dropTempFunction is that
+  // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
+  // dropFunction and dropTempFunction.
+  def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
+    if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
+      throw new AnalysisException(
+        s"Temporary function '$name' cannot be dropped because it does not exist!")
+    }
+    tempFunctions.remove(name)
+  }
+
+  /**
+   * Rename a function.
+   *
+   * If a database is specified in `oldName`, this will rename the function in that database.
+   * If no database is specified, this will first attempt to rename a temporary function with
+   * the same name, then, if that does not exist, rename the function in the current database.
+   *
+   * This assumes the database specified in `oldName` matches the one specified in `newName`.
+   */
+  def renameFunction(oldName: FunctionIdentifier, newName: FunctionIdentifier): Unit = {
+    if (oldName.database != newName.database) {
+      throw new AnalysisException("rename does not support moving functions across databases")
+    }
+    val db = oldName.database.getOrElse(currentDb)
+    if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) {
+      externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
+    } else {
+      val func = tempFunctions.remove(oldName.funcName)
+      val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName))
+      tempFunctions.put(newName.funcName, newFunc)
+    }
+  }
+
+  /**
+   * Retrieve the metadata of an existing function.
+   *
+   * If a database is specified in `name`, this will return the function in that database.
+   * If no database is specified, this will first attempt to return a temporary function with
+   * the same name, then, if that does not exist, return the function in the current database.
+   */
+  def getFunction(name: FunctionIdentifier): CatalogFunction = {
+    val db = name.database.getOrElse(currentDb)
+    if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
+      externalCatalog.getFunction(db, name.funcName)
+    } else {
+      tempFunctions.get(name.funcName)
+    }
+  }
+
+  // TODO: implement lookupFunction that returns something from the registry itself
+
+  /**
+   * List all matching functions in the specified database, including temporary functions.
+   */
+  def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
+    val dbFunctions =
+      externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
+    val regex = pattern.replaceAll("\\*", ".*").r
+    val _tempFunctions = tempFunctions.keys().asScala
+      .filter { f => regex.pattern.matcher(f).matches() }
+      .map { f => FunctionIdentifier(f) }
+    dbFunctions ++ _tempFunctions
+  }
+
+  /**
+   * Return a temporary function. For testing only.
+   */
+  private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = {
+    Option(tempFunctions.get(name))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index db34af3..c4e4961 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.catalog
 import javax.annotation.Nullable
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 
 
 /**
@@ -167,7 +170,7 @@ abstract class ExternalCatalog {
  * @param name name of the function
  * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
  */
-case class CatalogFunction(name: String, className: String)
+case class CatalogFunction(name: FunctionIdentifier, className: String)
 
 
 /**
@@ -211,8 +214,7 @@ case class CatalogTablePartition(
  * future once we have a better understanding of how we want to handle skewed columns.
  */
 case class CatalogTable(
-    specifiedDatabase: Option[String],
-    name: String,
+    name: TableIdentifier,
     tableType: CatalogTableType,
     storage: CatalogStorageFormat,
     schema: Seq[CatalogColumn],
@@ -226,12 +228,12 @@ case class CatalogTable(
     viewText: Option[String] = None) {
 
   /** Return the database this table was specified to belong to, assuming it exists. */
-  def database: String = specifiedDatabase.getOrElse {
+  def database: String = name.database.getOrElse {
     throw new AnalysisException(s"table $name did not specify database")
   }
 
   /** Return the fully qualified name of this table, assuming the database was specified. */
-  def qualifiedName: String = s"$database.$name"
+  def qualifiedName: String = name.unquotedString
 
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
@@ -272,3 +274,20 @@ object ExternalCatalog {
    */
   type TablePartitionSpec = Map[String, String]
 }
+
+
+/**
+ * A [[LogicalPlan]] that wraps [[CatalogTable]].
+ */
+case class CatalogRelation(
+    db: String,
+    metadata: CatalogTable,
+    alias: Option[String] = None)
+  extends LeafNode {
+
+  // TODO: implement this
+  override def output: Seq[Attribute] = Seq.empty
+
+  require(metadata.name.database == Some(db),
+    "provided database does not much the one specified in the table definition")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
new file mode 100644
index 0000000..87f4d1b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+
+/**
+ * An identifier that optionally specifies a database.
+ *
+ * Format (unquoted): "name" or "db.name"
+ * Format (quoted): "`name`" or "`db`.`name`"
+ */
+sealed trait IdentifierWithDatabase {
+  val name: String
+  def database: Option[String]
+  def quotedString: String = database.map(db => s"`$db`.`$name`").getOrElse(s"`$name`")
+  def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
+  override def toString: String = quotedString
+}
+
+
+/**
+ * Identifies a table in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class TableIdentifier(table: String, database: Option[String])
+  extends IdentifierWithDatabase {
+
+  override val name: String = table
+
+  def this(name: String) = this(name, None)
+
+}
+
+object TableIdentifier {
+  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
+}
+
+
+/**
+ * Identifies a function in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class FunctionIdentifier(funcName: String, database: Option[String])
+  extends IdentifierWithDatabase {
+
+  override val name: String = funcName
+
+  def this(name: String) = this(name, None)
+}
+
+object FunctionIdentifier {
+  def apply(funcName: String): FunctionIdentifier = new FunctionIdentifier(funcName)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index b03ba81..a1ea619 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -21,6 +21,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -29,23 +31,10 @@ import org.apache.spark.sql.AnalysisException
  * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this.
  */
 abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
-  private lazy val storageFormat = CatalogStorageFormat(
-    locationUri = None,
-    inputFormat = Some(tableInputFormat),
-    outputFormat = Some(tableOutputFormat),
-    serde = None,
-    serdeProperties = Map.empty)
-  private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
-  private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
-  private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
-  private val funcClass = "org.apache.spark.myFunc"
-
-  // Things subclasses should override
-  protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat"
-  protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat"
-  protected def newUriForDatabase(): String = "uri"
+  protected val utils: CatalogTestUtils
+  import utils._
+
   protected def resetState(): Unit = { }
-  protected def newEmptyCatalog(): ExternalCatalog
 
   // Clear all state after each test
   override def afterEach(): Unit = {
@@ -56,62 +45,6 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
     }
   }
 
-  /**
-   * Creates a basic catalog, with the following structure:
-   *
-   * default
-   * db1
-   * db2
-   *   - tbl1
-   *   - tbl2
-   *     - part1
-   *     - part2
-   *   - func1
-   */
-  private def newBasicCatalog(): ExternalCatalog = {
-    val catalog = newEmptyCatalog()
-    // When testing against a real catalog, the default database may already exist
-    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
-    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
-    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
-    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
-    catalog.createFunction("db2", newFunc("func1"))
-    catalog
-  }
-
-  private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass)
-
-  private def newDb(name: String): CatalogDatabase = {
-    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
-  }
-
-  private def newTable(name: String, db: String): CatalogTable = {
-    CatalogTable(
-      specifiedDatabase = Some(db),
-      name = name,
-      tableType = CatalogTableType.EXTERNAL_TABLE,
-      storage = storageFormat,
-      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
-      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
-  }
-
-  private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass)
-
-  /**
-   * Whether the catalog's table partitions equal the ones given.
-   * Note: Hive sets some random serde things, so we just compare the specs here.
-   */
-  private def catalogPartitionsEqual(
-      catalog: ExternalCatalog,
-      db: String,
-      table: String,
-      parts: Seq[CatalogTablePartition]): Boolean = {
-    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
-  }
-
-
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -277,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
   }
 
   test("get table") {
-    assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
+    assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
   }
 
   test("get table when database/table does not exist") {
@@ -409,7 +342,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
 
   test("alter partitions") {
     val catalog = newBasicCatalog()
-    try{
+    try {
       // Note: Before altering table partitions in Hive, you *must* set the current database
       // to the one that contains the table of interest. Otherwise you will end up with the
       // most helpful error message ever: "Unable to alter partition. alter is not possible."
@@ -498,7 +431,8 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
 
   test("get function") {
     val catalog = newBasicCatalog()
-    assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
+    assert(catalog.getFunction("db2", "func1") ==
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
     intercept[AnalysisException] {
       catalog.getFunction("db2", "does_not_exist")
     }
@@ -517,7 +451,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
     assert(catalog.getFunction("db2", "func1").className == funcClass)
     catalog.renameFunction("db2", "func1", newName)
     intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
-    assert(catalog.getFunction("db2", newName).name == newName)
+    assert(catalog.getFunction("db2", newName).name.funcName == newName)
     assert(catalog.getFunction("db2", newName).className == funcClass)
     intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
   }
@@ -553,3 +487,88 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
   }
 
 }
+
+
+/**
+ * A collection of utility fields and methods for tests related to the [[ExternalCatalog]].
+ */
+abstract class CatalogTestUtils {
+
+  // Unimplemented methods
+  val tableInputFormat: String
+  val tableOutputFormat: String
+  def newEmptyCatalog(): ExternalCatalog
+
+  // These fields must be lazy because they rely on fields that are not implemented yet
+  lazy val storageFormat = CatalogStorageFormat(
+    locationUri = None,
+    inputFormat = Some(tableInputFormat),
+    outputFormat = Some(tableOutputFormat),
+    serde = None,
+    serdeProperties = Map.empty)
+  lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
+  lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
+  lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+  lazy val funcClass = "org.apache.spark.myFunc"
+
+  /**
+   * Creates a basic catalog, with the following structure:
+   *
+   * default
+   * db1
+   * db2
+   *   - tbl1
+   *   - tbl2
+   *     - part1
+   *     - part2
+   *   - func1
+   */
+  def newBasicCatalog(): ExternalCatalog = {
+    val catalog = newEmptyCatalog()
+    // When testing against a real catalog, the default database may already exist
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
+    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
+    catalog.createFunction("db2", newFunc("func1", Some("db2")))
+    catalog
+  }
+
+  def newFunc(): CatalogFunction = newFunc("funcName")
+
+  def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+  def newDb(name: String): CatalogDatabase = {
+    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+  }
+
+  def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
+
+  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+    CatalogTable(
+      name = TableIdentifier(name, database),
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = storageFormat,
+      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
+      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
+  }
+
+  def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
+    CatalogFunction(FunctionIdentifier(name, database), funcClass)
+  }
+
+  /**
+   * Whether the catalog's table partitions equal the ones given.
+   * Note: Hive sets some random serde things, so we just compare the specs here.
+   */
+  def catalogPartitionsEqual(
+      catalog: ExternalCatalog,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Boolean = {
+    catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 9531758..63a7b2c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -17,7 +17,14 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+
 /** Test suite for the [[InMemoryCatalog]]. */
 class InMemoryCatalogSuite extends CatalogTestCases {
-  override protected def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+
+  protected override val utils: CatalogTestUtils = new CatalogTestUtils {
+    override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
+    override val tableOutputFormat: String = "org.apache.park.SequenceFileOutputFormat"
+    override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org