You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/16 18:34:19 UTC

[2/3] spark git commit: [SPARK-19945][SQL] add test suite for SessionCatalog with HiveExternalCatalog

http://git-wip-us.apache.org/repos/asf/spark/blob/8e8f8983/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7e74dcd..bb87763 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -27,41 +27,67 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
 
+class InMemorySessionCatalogSuite extends SessionCatalogSuite {
+  protected val utils = new CatalogTestUtils {
+    override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
+    override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
+    override val defaultProvider: String = "parquet"
+    override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+  }
+}
+
 /**
- * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
+ * Tests for [[SessionCatalog]]
  *
  * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]].
  * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method
  * signatures but do not extend a common parent. This is largely by design but
  * unfortunately leads to very similar test code in two places.
  */
-class SessionCatalogSuite extends PlanTest {
-  private val utils = new CatalogTestUtils {
-    override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
-    override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
-    override val defaultProvider: String = "parquet"
-    override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
-  }
+abstract class SessionCatalogSuite extends PlanTest {
+  protected val utils: CatalogTestUtils
+
+  protected val isHiveExternalCatalog = false
 
   import utils._
 
+  private def withBasicCatalog(f: SessionCatalog => Unit): Unit = {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    try {
+      f(catalog)
+    } finally {
+      catalog.reset()
+    }
+  }
+
+  private def withEmptyCatalog(f: SessionCatalog => Unit): Unit = {
+    val catalog = new SessionCatalog(newEmptyCatalog())
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    try {
+      f(catalog)
+    } finally {
+      catalog.reset()
+    }
+  }
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
 
   test("basic create and list databases") {
-    val catalog = new SessionCatalog(newEmptyCatalog())
-    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
-    assert(catalog.databaseExists("default"))
-    assert(!catalog.databaseExists("testing"))
-    assert(!catalog.databaseExists("testing2"))
-    catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
-    assert(catalog.databaseExists("testing"))
-    assert(catalog.listDatabases().toSet == Set("default", "testing"))
-    catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
-    assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
-    assert(catalog.databaseExists("testing2"))
-    assert(!catalog.databaseExists("does_not_exist"))
+    withEmptyCatalog { catalog =>
+      catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+      assert(catalog.databaseExists("default"))
+      assert(!catalog.databaseExists("testing"))
+      assert(!catalog.databaseExists("testing2"))
+      catalog.createDatabase(newDb("testing"), ignoreIfExists = false)
+      assert(catalog.databaseExists("testing"))
+      assert(catalog.listDatabases().toSet == Set("default", "testing"))
+      catalog.createDatabase(newDb("testing2"), ignoreIfExists = false)
+      assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2"))
+      assert(catalog.databaseExists("testing2"))
+      assert(!catalog.databaseExists("does_not_exist"))
+    }
   }
 
   def testInvalidName(func: (String) => Unit) {
@@ -76,121 +102,141 @@ class SessionCatalogSuite extends PlanTest {
   }
 
   test("create databases using invalid names") {
-    val catalog = new SessionCatalog(newEmptyCatalog())
-    testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true))
+    withEmptyCatalog { catalog =>
+      testInvalidName(
+        name => catalog.createDatabase(newDb(name), ignoreIfExists = true))
+    }
   }
 
   test("get database when a database exists") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val db1 = catalog.getDatabaseMetadata("db1")
-    assert(db1.name == "db1")
-    assert(db1.description.contains("db1"))
+    withBasicCatalog { catalog =>
+      val db1 = catalog.getDatabaseMetadata("db1")
+      assert(db1.name == "db1")
+      assert(db1.description.contains("db1"))
+    }
   }
 
   test("get database should throw exception when the database does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.getDatabaseMetadata("db_that_does_not_exist")
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.getDatabaseMetadata("db_that_does_not_exist")
+      }
     }
   }
 
   test("list databases without pattern") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3"))
+    withBasicCatalog { catalog =>
+      assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3"))
+    }
   }
 
   test("list databases with pattern") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.listDatabases("db").toSet == Set.empty)
-    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3"))
-    assert(catalog.listDatabases("*1").toSet == Set("db1"))
-    assert(catalog.listDatabases("db2").toSet == Set("db2"))
+    withBasicCatalog { catalog =>
+      assert(catalog.listDatabases("db").toSet == Set.empty)
+      assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3"))
+      assert(catalog.listDatabases("*1").toSet == Set("db1"))
+      assert(catalog.listDatabases("db2").toSet == Set("db2"))
+    }
   }
 
   test("drop database") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("default", "db2", "db3"))
+    withBasicCatalog { catalog =>
+      catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
+      assert(catalog.listDatabases().toSet == Set("default", "db2", "db3"))
+    }
   }
 
   test("drop database when the database is not empty") {
     // Throw exception if there are functions left
-    val externalCatalog1 = newBasicCatalog()
-    val sessionCatalog1 = new SessionCatalog(externalCatalog1)
-    externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
-    externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
-    intercept[AnalysisException] {
-      sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    withBasicCatalog { catalog =>
+      catalog.externalCatalog.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
+      catalog.externalCatalog.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
+      intercept[AnalysisException] {
+        catalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+      }
     }
-
-    // Throw exception if there are tables left
-    val externalCatalog2 = newBasicCatalog()
-    val sessionCatalog2 = new SessionCatalog(externalCatalog2)
-    externalCatalog2.dropFunction("db2", "func1")
-    intercept[AnalysisException] {
-      sessionCatalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+    withBasicCatalog { catalog =>
+      // Throw exception if there are tables left
+      catalog.externalCatalog.dropFunction("db2", "func1")
+      intercept[AnalysisException] {
+        catalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
+      }
     }
 
-    // When cascade is true, it should drop them
-    val externalCatalog3 = newBasicCatalog()
-    val sessionCatalog3 = new SessionCatalog(externalCatalog3)
-    externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1", "db3"))
+    withBasicCatalog { catalog =>
+      // When cascade is true, it should drop them
+      catalog.externalCatalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
+      assert(catalog.listDatabases().toSet == Set("default", "db1", "db3"))
+    }
   }
 
   test("drop database when the database does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+    withBasicCatalog { catalog =>
+      // TODO: fix this inconsistent between HiveExternalCatalog and InMemoryCatalog
+      if (isHiveExternalCatalog) {
+        val e = intercept[AnalysisException] {
+          catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+        }.getMessage
+        assert(e.contains(
+          "org.apache.hadoop.hive.metastore.api.NoSuchObjectException: db_that_does_not_exist"))
+      } else {
+        intercept[NoSuchDatabaseException] {
+          catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false)
+        }
+      }
+      catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
     }
-    catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false)
   }
 
   test("drop current database and drop default database") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    catalog.setCurrentDatabase("db1")
-    assert(catalog.getCurrentDatabase == "db1")
-    catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = true)
-    intercept[NoSuchDatabaseException] {
-      catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false)
-    }
-    catalog.setCurrentDatabase("default")
-    assert(catalog.getCurrentDatabase == "default")
-    intercept[AnalysisException] {
-      catalog.dropDatabase("default", ignoreIfNotExists = false, cascade = true)
+    withBasicCatalog { catalog =>
+      catalog.setCurrentDatabase("db1")
+      assert(catalog.getCurrentDatabase == "db1")
+      catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = true)
+      intercept[NoSuchDatabaseException] {
+        catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false)
+      }
+      catalog.setCurrentDatabase("default")
+      assert(catalog.getCurrentDatabase == "default")
+      intercept[AnalysisException] {
+        catalog.dropDatabase("default", ignoreIfNotExists = false, cascade = true)
+      }
     }
   }
 
   test("alter database") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val db1 = catalog.getDatabaseMetadata("db1")
-    // Note: alter properties here because Hive does not support altering other fields
-    catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
-    val newDb1 = catalog.getDatabaseMetadata("db1")
-    assert(db1.properties.isEmpty)
-    assert(newDb1.properties.size == 2)
-    assert(newDb1.properties.get("k") == Some("v3"))
-    assert(newDb1.properties.get("good") == Some("true"))
+    withBasicCatalog { catalog =>
+      val db1 = catalog.getDatabaseMetadata("db1")
+      // Note: alter properties here because Hive does not support altering other fields
+      catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
+      val newDb1 = catalog.getDatabaseMetadata("db1")
+      assert(db1.properties.isEmpty)
+      assert(newDb1.properties.size == 2)
+      assert(newDb1.properties.get("k") == Some("v3"))
+      assert(newDb1.properties.get("good") == Some("true"))
+    }
   }
 
   test("alter database should throw exception when the database does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.alterDatabase(newDb("unknown_db"))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.alterDatabase(newDb("unknown_db"))
+      }
     }
   }
 
   test("get/set current database") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.getCurrentDatabase == "default")
-    catalog.setCurrentDatabase("db2")
-    assert(catalog.getCurrentDatabase == "db2")
-    intercept[NoSuchDatabaseException] {
+    withBasicCatalog { catalog =>
+      assert(catalog.getCurrentDatabase == "default")
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.getCurrentDatabase == "db2")
+      intercept[NoSuchDatabaseException] {
+        catalog.setCurrentDatabase("deebo")
+      }
+      catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
       catalog.setCurrentDatabase("deebo")
+      assert(catalog.getCurrentDatabase == "deebo")
     }
-    catalog.createDatabase(newDb("deebo"), ignoreIfExists = false)
-    catalog.setCurrentDatabase("deebo")
-    assert(catalog.getCurrentDatabase == "deebo")
   }
 
   // --------------------------------------------------------------------------
@@ -198,346 +244,360 @@ class SessionCatalogSuite extends PlanTest {
   // --------------------------------------------------------------------------
 
   test("create table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(externalCatalog.listTables("db1").isEmpty)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    sessionCatalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false)
-    sessionCatalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false)
-    assert(externalCatalog.listTables("db1").toSet == Set("tbl3"))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
-    // Create table without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db1")
-    sessionCatalog.createTable(newTable("tbl4"), ignoreIfExists = false)
-    assert(externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4"))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+    withBasicCatalog { catalog =>
+      assert(catalog.externalCatalog.listTables("db1").isEmpty)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      catalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false)
+      catalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+      // Create table without explicitly specifying database
+      catalog.setCurrentDatabase("db1")
+      catalog.createTable(newTable("tbl4"), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3"))
+    }
   }
 
   test("create tables using invalid names") {
-    val catalog = new SessionCatalog(newEmptyCatalog())
-    testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false))
+    withEmptyCatalog { catalog =>
+      testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false))
+    }
   }
 
   test("create table when database does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    // Creating table in non-existent database should always fail
-    intercept[NoSuchDatabaseException] {
-      catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
-    }
-    intercept[NoSuchDatabaseException] {
-      catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
-    }
-    // Table already exists
-    intercept[TableAlreadyExistsException] {
-      catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+    withBasicCatalog { catalog =>
+      // Creating table in non-existent database should always fail
+      intercept[NoSuchDatabaseException] {
+        catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false)
+      }
+      intercept[NoSuchDatabaseException] {
+        catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true)
+      }
+      // Table already exists
+      intercept[TableAlreadyExistsException] {
+        catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+      }
+      catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true)
     }
-    catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true)
   }
 
   test("create temp table") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable1 = Range(1, 10, 1, 10)
-    val tempTable2 = Range(1, 20, 2, 10)
-    catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
-    catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
-    assert(catalog.getTempView("tbl1") == Option(tempTable1))
-    assert(catalog.getTempView("tbl2") == Option(tempTable2))
-    assert(catalog.getTempView("tbl3").isEmpty)
-    // Temporary table already exists
-    intercept[TempTableAlreadyExistsException] {
+    withBasicCatalog { catalog =>
+      val tempTable1 = Range(1, 10, 1, 10)
+      val tempTable2 = Range(1, 20, 2, 10)
       catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
+      catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
+      assert(catalog.getTempView("tbl1") == Option(tempTable1))
+      assert(catalog.getTempView("tbl2") == Option(tempTable2))
+      assert(catalog.getTempView("tbl3").isEmpty)
+      // Temporary table already exists
+      intercept[TempTableAlreadyExistsException] {
+        catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
+      }
+      // Temporary table already exists but we override it
+      catalog.createTempView("tbl1", tempTable2, overrideIfExists = true)
+      assert(catalog.getTempView("tbl1") == Option(tempTable2))
     }
-    // Temporary table already exists but we override it
-    catalog.createTempView("tbl1", tempTable2, overrideIfExists = true)
-    assert(catalog.getTempView("tbl1") == Option(tempTable2))
   }
 
   test("drop table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
-      purge = false)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
-    // Drop table without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false)
-    assert(externalCatalog.listTables("db2").isEmpty)
+    withBasicCatalog { catalog =>
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      catalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
+        purge = false)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2"))
+      // Drop table without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      catalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false)
+      assert(catalog.externalCatalog.listTables("db2").isEmpty)
+    }
   }
 
   test("drop table when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    // Should always throw exception when the database does not exist
-    intercept[NoSuchDatabaseException] {
-      catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false,
-        purge = false)
-    }
-    intercept[NoSuchDatabaseException] {
-      catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true,
-        purge = false)
-    }
-    intercept[NoSuchTableException] {
-      catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false,
+    withBasicCatalog { catalog =>
+      // Should always throw exception when the database does not exist
+      intercept[NoSuchDatabaseException] {
+        catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false,
+          purge = false)
+      }
+      intercept[NoSuchDatabaseException] {
+        catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true,
+          purge = false)
+      }
+      intercept[NoSuchTableException] {
+        catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false,
+          purge = false)
+      }
+      catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true,
         purge = false)
     }
-    catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true,
-      purge = false)
   }
 
   test("drop temp table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10)
-    sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
-    sessionCatalog.setCurrentDatabase("db2")
-    assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    // If database is not specified, temp table should be dropped first
-    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
-    assert(sessionCatalog.getTempView("tbl1") == None)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    // If temp table does not exist, the table in the current database should be dropped
-    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
-    // If database is specified, temp tables are never dropped
-    sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
-    sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
-    sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
-      purge = false)
-    assert(sessionCatalog.getTempView("tbl1") == Some(tempTable))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
+    withBasicCatalog { catalog =>
+      val tempTable = Range(1, 10, 2, 10)
+      catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.getTempView("tbl1") == Some(tempTable))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      // If database is not specified, temp table should be dropped first
+      catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
+      assert(catalog.getTempView("tbl1") == None)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      // If temp table does not exist, the table in the current database should be dropped
+      catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2"))
+      // If database is specified, temp tables are never dropped
+      catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
+      catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
+      catalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
+        purge = false)
+      assert(catalog.getTempView("tbl1") == Some(tempTable))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2"))
+    }
   }
 
   test("rename table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone"))
-    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
-    sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo"))
-    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
-    // Rename table without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
-    assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
-    // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
-    intercept[AnalysisException] {
-      sessionCatalog.renameTable(
-        TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
-    }
-    // The new table already exists
-    intercept[TableAlreadyExistsException] {
-      sessionCatalog.renameTable(
-        TableIdentifier("tblone", Some("db2")),
-        TableIdentifier("table_two"))
+    withBasicCatalog { catalog =>
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2"))
+      catalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo"))
+      // Rename table without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      catalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two"))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "table_two"))
+      // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match
+      intercept[AnalysisException] {
+        catalog.renameTable(
+          TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1")))
+      }
+      // The new table already exists
+      intercept[TableAlreadyExistsException] {
+        catalog.renameTable(
+          TableIdentifier("tblone", Some("db2")),
+          TableIdentifier("table_two"))
+      }
     }
   }
 
   test("rename tables to an invalid name") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    testInvalidName(
-      name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name)))
+    withBasicCatalog { catalog =>
+      testInvalidName(
+        name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name)))
+    }
   }
 
   test("rename table when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2"))
-    }
-    intercept[NoSuchTableException] {
-      catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2"))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2"))
+      }
+      intercept[NoSuchTableException] {
+        catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2"))
+      }
     }
   }
 
   test("rename temp table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10)
-    sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
-    sessionCatalog.setCurrentDatabase("db2")
-    assert(sessionCatalog.getTempView("tbl1") == Option(tempTable))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    // If database is not specified, temp table should be renamed first
-    sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
-    assert(sessionCatalog.getTempView("tbl1").isEmpty)
-    assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
-    // If database is specified, temp tables are never renamed
-    sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4"))
-    assert(sessionCatalog.getTempView("tbl3") == Option(tempTable))
-    assert(sessionCatalog.getTempView("tbl4").isEmpty)
-    assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
+    withBasicCatalog { catalog =>
+      val tempTable = Range(1, 10, 2, 10)
+      catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.getTempView("tbl1") == Option(tempTable))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      // If database is not specified, temp table should be renamed first
+      catalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3"))
+      assert(catalog.getTempView("tbl1").isEmpty)
+      assert(catalog.getTempView("tbl3") == Option(tempTable))
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+      // If database is specified, temp tables are never renamed
+      catalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4"))
+      assert(catalog.getTempView("tbl3") == Option(tempTable))
+      assert(catalog.getTempView("tbl4").isEmpty)
+      assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4"))
+    }
   }
 
   test("alter table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tbl1 = externalCatalog.getTable("db2", "tbl1")
-    sessionCatalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
-    val newTbl1 = externalCatalog.getTable("db2", "tbl1")
-    assert(!tbl1.properties.contains("toh"))
-    assert(newTbl1.properties.size == tbl1.properties.size + 1)
-    assert(newTbl1.properties.get("toh") == Some("frem"))
-    // Alter table without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1")))
-    val newestTbl1 = externalCatalog.getTable("db2", "tbl1")
-    assert(newestTbl1 == tbl1)
+    withBasicCatalog { catalog =>
+      val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
+      val newTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      assert(!tbl1.properties.contains("toh"))
+      assert(newTbl1.properties.size == tbl1.properties.size + 1)
+      assert(newTbl1.properties.get("toh") == Some("frem"))
+      // Alter table without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      catalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1")))
+      val newestTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      // For hive serde table, hive metastore will set transient_lastDdlTime in table's properties,
+      // and its value will be modified, here we ignore it when comparing the two tables.
+      assert(newestTbl1.copy(properties = Map.empty) == tbl1.copy(properties = Map.empty))
+    }
   }
 
   test("alter table when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.alterTable(newTable("tbl1", "unknown_db"))
-    }
-    intercept[NoSuchTableException] {
-      catalog.alterTable(newTable("unknown_table", "db2"))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.alterTable(newTable("tbl1", "unknown_db"))
+      }
+      intercept[NoSuchTableException] {
+        catalog.alterTable(newTable("unknown_table", "db2"))
+      }
     }
   }
 
   test("get table") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
-      == externalCatalog.getTable("db2", "tbl1"))
-    // Get table without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1"))
-      == externalCatalog.getTable("db2", "tbl1"))
+    withBasicCatalog { catalog =>
+      assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
+        == catalog.externalCatalog.getTable("db2", "tbl1"))
+      // Get table without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.getTableMetadata(TableIdentifier("tbl1"))
+        == catalog.externalCatalog.getTable("db2", "tbl1"))
+    }
   }
 
   test("get table when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
-    }
-    intercept[NoSuchTableException] {
-      catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
+      }
+      intercept[NoSuchTableException] {
+        catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
+      }
     }
   }
 
   test("get option of table metadata") {
-    val externalCatalog = newBasicCatalog()
-    val catalog = new SessionCatalog(externalCatalog)
-    assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2")))
-      == Option(externalCatalog.getTable("db2", "tbl1")))
-    assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty)
-    intercept[NoSuchDatabaseException] {
-      catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db")))
+    withBasicCatalog { catalog =>
+      assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2")))
+        == Option(catalog.externalCatalog.getTable("db2", "tbl1")))
+      assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty)
+      intercept[NoSuchDatabaseException] {
+        catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db")))
+      }
     }
   }
 
   test("lookup table relation") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable1 = Range(1, 10, 1, 10)
-    val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
-    sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
-    sessionCatalog.setCurrentDatabase("db2")
-    // If we explicitly specify the database, we'll look up the relation in that database
-    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
-      .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
-    // Otherwise, we'll first look up a temporary table with the same name
-    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", tempTable1))
-    // Then, if that does not exist, look up the relation in the current database
-    sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
-    assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head
-      .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+    withBasicCatalog { catalog =>
+      val tempTable1 = Range(1, 10, 1, 10)
+      val metastoreTable1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
+      catalog.setCurrentDatabase("db2")
+      // If we explicitly specify the database, we'll look up the relation in that database
+      assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
+        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+      // Otherwise, we'll first look up a temporary table with the same name
+      assert(catalog.lookupRelation(TableIdentifier("tbl1"))
+        == SubqueryAlias("tbl1", tempTable1))
+      // Then, if that does not exist, look up the relation in the current database
+      catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
+      assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
+        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+    }
   }
 
   test("look up view relation") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val metadata = externalCatalog.getTable("db3", "view1")
-    sessionCatalog.setCurrentDatabase("default")
-    // Look up a view.
-    assert(metadata.viewText.isDefined)
-    val view = View(desc = metadata, output = metadata.schema.toAttributes,
-      child = CatalystSqlParser.parsePlan(metadata.viewText.get))
-    comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1", Some("db3"))),
-      SubqueryAlias("view1", view))
-    // Look up a view using current database of the session catalog.
-    sessionCatalog.setCurrentDatabase("db3")
-    comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
-      SubqueryAlias("view1", view))
+    withBasicCatalog { catalog =>
+      val metadata = catalog.externalCatalog.getTable("db3", "view1")
+      catalog.setCurrentDatabase("default")
+      // Look up a view.
+      assert(metadata.viewText.isDefined)
+      val view = View(desc = metadata, output = metadata.schema.toAttributes,
+        child = CatalystSqlParser.parsePlan(metadata.viewText.get))
+      comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))),
+        SubqueryAlias("view1", view))
+      // Look up a view using current database of the session catalog.
+      catalog.setCurrentDatabase("db3")
+      comparePlans(catalog.lookupRelation(TableIdentifier("view1")),
+        SubqueryAlias("view1", view))
+    }
   }
 
   test("table exists") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
-    assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
-    assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
-    assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
-    assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
-    // If database is explicitly specified, do not check temporary tables
-    val tempTable = Range(1, 10, 1, 10)
-    assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
-    // If database is not explicitly specified, check the current database
-    catalog.setCurrentDatabase("db2")
-    assert(catalog.tableExists(TableIdentifier("tbl1")))
-    assert(catalog.tableExists(TableIdentifier("tbl2")))
-
-    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
-    // tableExists should not check temp view.
-    assert(!catalog.tableExists(TableIdentifier("tbl3")))
+    withBasicCatalog { catalog =>
+      assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
+      assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2"))))
+      assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+      assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
+      assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
+      // If database is explicitly specified, do not check temporary tables
+      val tempTable = Range(1, 10, 1, 10)
+      assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
+      // If database is not explicitly specified, check the current database
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.tableExists(TableIdentifier("tbl1")))
+      assert(catalog.tableExists(TableIdentifier("tbl2")))
+
+      catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
+      // tableExists should not check temp view.
+      assert(!catalog.tableExists(TableIdentifier("tbl3")))
+    }
   }
 
   test("getTempViewOrPermanentTableMetadata on temporary views") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
-    }.getMessage
+    withBasicCatalog { catalog =>
+      val tempTable = Range(1, 10, 2, 10)
+      intercept[NoSuchTableException] {
+        catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
+      }.getMessage
 
-    intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
-    }.getMessage
+      intercept[NoSuchTableException] {
+        catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
+      }.getMessage
 
-    catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.getTempViewOrPermanentTableMetadata(
-      TableIdentifier("view1")).identifier.table == "view1")
-    assert(catalog.getTempViewOrPermanentTableMetadata(
-      TableIdentifier("view1")).schema(0).name == "id")
+      catalog.createTempView("view1", tempTable, overrideIfExists = false)
+      assert(catalog.getTempViewOrPermanentTableMetadata(
+        TableIdentifier("view1")).identifier.table == "view1")
+      assert(catalog.getTempViewOrPermanentTableMetadata(
+        TableIdentifier("view1")).schema(0).name == "id")
 
-    intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
-    }.getMessage
+      intercept[NoSuchTableException] {
+        catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
+      }.getMessage
+    }
   }
 
   test("list tables without pattern") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
-    catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
-    assert(catalog.listTables("db1").toSet ==
-      Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
-    assert(catalog.listTables("db2").toSet ==
-      Set(TableIdentifier("tbl1"),
-        TableIdentifier("tbl4"),
-        TableIdentifier("tbl1", Some("db2")),
-        TableIdentifier("tbl2", Some("db2"))))
-    intercept[NoSuchDatabaseException] {
-      catalog.listTables("unknown_db")
+    withBasicCatalog { catalog =>
+      val tempTable = Range(1, 10, 2, 10)
+      catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
+      catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
+      assert(catalog.listTables("db1").toSet ==
+        Set(TableIdentifier("tbl1"), TableIdentifier("tbl4")))
+      assert(catalog.listTables("db2").toSet ==
+        Set(TableIdentifier("tbl1"),
+          TableIdentifier("tbl4"),
+          TableIdentifier("tbl1", Some("db2")),
+          TableIdentifier("tbl2", Some("db2"))))
+      intercept[NoSuchDatabaseException] {
+        catalog.listTables("unknown_db")
+      }
     }
   }
 
   test("list tables with pattern") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
-    catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
-    assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
-    assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
-    assert(catalog.listTables("db2", "tbl*").toSet ==
-      Set(TableIdentifier("tbl1"),
-        TableIdentifier("tbl4"),
-        TableIdentifier("tbl1", Some("db2")),
-        TableIdentifier("tbl2", Some("db2"))))
-    assert(catalog.listTables("db2", "*1").toSet ==
-      Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
-    intercept[NoSuchDatabaseException] {
-      catalog.listTables("unknown_db", "*")
+    withBasicCatalog { catalog =>
+      val tempTable = Range(1, 10, 2, 10)
+      catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
+      catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
+      assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet)
+      assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet)
+      assert(catalog.listTables("db2", "tbl*").toSet ==
+        Set(TableIdentifier("tbl1"),
+          TableIdentifier("tbl4"),
+          TableIdentifier("tbl1", Some("db2")),
+          TableIdentifier("tbl2", Some("db2"))))
+      assert(catalog.listTables("db2", "*1").toSet ==
+        Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2"))))
+      intercept[NoSuchDatabaseException] {
+        catalog.listTables("unknown_db", "*")
+      }
     }
   }
 
@@ -546,451 +606,477 @@ class SessionCatalogSuite extends PlanTest {
   // --------------------------------------------------------------------------
 
   test("basic create and list partitions") {
-    val externalCatalog = newEmptyCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
-    sessionCatalog.createPartitions(
-      TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(externalCatalog.listPartitions("mydb", "tbl"), part1, part2))
-    // Create partitions without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("mydb")
-    sessionCatalog.createPartitions(
-      TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(
-      externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder))
+    withEmptyCatalog { catalog =>
+      catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+      catalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false)
+      catalog.createPartitions(
+        TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false)
+      assert(catalogPartitionsEqual(
+        catalog.externalCatalog.listPartitions("mydb", "tbl"), part1, part2))
+      // Create partitions without explicitly specifying database
+      catalog.setCurrentDatabase("mydb")
+      catalog.createPartitions(
+        TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
+      assert(catalogPartitionsEqual(
+        catalog.externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder))
+    }
   }
 
   test("create partitions when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.createPartitions(
-        TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false)
-    }
-    intercept[NoSuchTableException] {
-      catalog.createPartitions(
-        TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false)
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false)
+      }
+      intercept[NoSuchTableException] {
+        catalog.createPartitions(
+          TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false)
+      }
     }
   }
 
   test("create partitions that already exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
+    withBasicCatalog { catalog =>
+      intercept[AnalysisException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false)
+      }
       catalog.createPartitions(
-        TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false)
+        TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
     }
-    catalog.createPartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
   }
 
   test("create partitions with invalid part spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.createPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(part1, partWithLessColumns), ignoreIfExists = false)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.createPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(part1, partWithMoreColumns), ignoreIfExists = true)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.createPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(partWithUnknownColumns, part1), ignoreIfExists = true)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.createPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(part1, partWithLessColumns), ignoreIfExists = false)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(part1, partWithMoreColumns), ignoreIfExists = true)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(partWithUnknownColumns, part1), ignoreIfExists = true)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.createPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("drop partitions") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
-    sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")),
-      Seq(part1.spec),
-      ignoreIfNotExists = false,
-      purge = false,
-      retainData = false)
-    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
-    // Drop partitions without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2"),
-      Seq(part2.spec),
-      ignoreIfNotExists = false,
-      purge = false,
-      retainData = false)
-    assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
-    // Drop multiple partitions at once
-    sessionCatalog.createPartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
-    assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
-    sessionCatalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")),
-      Seq(part1.spec, part2.spec),
-      ignoreIfNotExists = false,
-      purge = false,
-      retainData = false)
-    assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
-  }
-
-  test("drop partitions when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
+    withBasicCatalog { catalog =>
+      assert(catalogPartitionsEqual(
+        catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
       catalog.dropPartitions(
-        TableIdentifier("tbl1", Some("unknown_db")),
-        Seq(),
+        TableIdentifier("tbl2", Some("db2")),
+        Seq(part1.spec),
         ignoreIfNotExists = false,
         purge = false,
         retainData = false)
-    }
-    intercept[NoSuchTableException] {
+      assert(catalogPartitionsEqual(
+        catalog.externalCatalog.listPartitions("db2", "tbl2"), part2))
+      // Drop partitions without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
       catalog.dropPartitions(
-        TableIdentifier("does_not_exist", Some("db2")),
-        Seq(),
+        TableIdentifier("tbl2"),
+        Seq(part2.spec),
         ignoreIfNotExists = false,
         purge = false,
         retainData = false)
-    }
-  }
-
-  test("drop partitions that do not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
+      assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty)
+      // Drop multiple partitions at once
+      catalog.createPartitions(
+        TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false)
+      assert(catalogPartitionsEqual(
+        catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2))
       catalog.dropPartitions(
         TableIdentifier("tbl2", Some("db2")),
-        Seq(part3.spec),
+        Seq(part1.spec, part2.spec),
         ignoreIfNotExists = false,
         purge = false,
         retainData = false)
+      assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty)
     }
-    catalog.dropPartitions(
-      TableIdentifier("tbl2", Some("db2")),
-      Seq(part3.spec),
-      ignoreIfNotExists = true,
-      purge = false,
-      retainData = false)
   }
 
-  test("drop partitions with invalid partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.dropPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(partWithMoreColumns.spec),
-        ignoreIfNotExists = false,
-        purge = false,
-        retainData = false)
+  test("drop partitions when database/table does not exist") {
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.dropPartitions(
+          TableIdentifier("tbl1", Some("unknown_db")),
+          Seq(),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
+      intercept[NoSuchTableException] {
+        catalog.dropPartitions(
+          TableIdentifier("does_not_exist", Some("db2")),
+          Seq(),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
     }
-    assert(e.getMessage.contains(
-      "Partition spec is invalid. The spec (a, b, c) must be contained within " +
-        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
+  }
+
+  test("drop partitions that do not exist") {
+    withBasicCatalog { catalog =>
+      intercept[AnalysisException] {
+        catalog.dropPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(part3.spec),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
       catalog.dropPartitions(
         TableIdentifier("tbl2", Some("db2")),
-        Seq(partWithUnknownColumns.spec),
-        ignoreIfNotExists = false,
+        Seq(part3.spec),
+        ignoreIfNotExists = true,
         purge = false,
         retainData = false)
     }
-    assert(e.getMessage.contains(
-      "Partition spec is invalid. The spec (a, unknown) must be contained within " +
-        "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.dropPartitions(
-        TableIdentifier("tbl2", Some("db2")),
-        Seq(partWithEmptyValue.spec, part1.spec),
-        ignoreIfNotExists = false,
-        purge = false,
-        retainData = false)
+  }
+
+  test("drop partitions with invalid partition spec") {
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.dropPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(partWithMoreColumns.spec),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
+      assert(e.getMessage.contains(
+        "Partition spec is invalid. The spec (a, b, c) must be contained within " +
+          "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.dropPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(partWithUnknownColumns.spec),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
+      assert(e.getMessage.contains(
+        "Partition spec is invalid. The spec (a, unknown) must be contained within " +
+          "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.dropPartitions(
+          TableIdentifier("tbl2", Some("db2")),
+          Seq(partWithEmptyValue.spec, part1.spec),
+          ignoreIfNotExists = false,
+          purge = false,
+          retainData = false)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("get partition") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.getPartition(
-      TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec)
-    assert(catalog.getPartition(
-      TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec)
-    // Get partition without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec)
-    assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec)
-    // Get non-existent partition
-    intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl2"), part3.spec)
+    withBasicCatalog { catalog =>
+      assert(catalog.getPartition(
+        TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec)
+      assert(catalog.getPartition(
+        TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec)
+      // Get partition without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec)
+      assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec)
+      // Get non-existent partition
+      intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl2"), part3.spec)
+      }
     }
   }
 
   test("get partition when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec)
-    }
-    intercept[NoSuchTableException] {
-      catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec)
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec)
+      }
+      intercept[NoSuchTableException] {
+        catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec)
+      }
     }
   }
 
   test("get partition with invalid partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec)
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("rename partitions") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
-    val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
-    val newSpecs = Seq(newPart1.spec, newPart2.spec)
-    catalog.renamePartitions(
-      TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs)
-    assert(catalog.getPartition(
-      TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec)
-    assert(catalog.getPartition(
-      TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec)
-    intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
-    }
-    intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
-    }
-    // Rename partitions without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec))
-    assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec)
-    assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec)
-    intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec)
-    }
-    intercept[AnalysisException] {
-      catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec)
+    withBasicCatalog { catalog =>
+      val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
+      val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201"))
+      val newSpecs = Seq(newPart1.spec, newPart2.spec)
+      catalog.renamePartitions(
+        TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs)
+      assert(catalog.getPartition(
+        TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec)
+      assert(catalog.getPartition(
+        TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec)
+      intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+      }
+      intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+      }
+      // Rename partitions without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec))
+      assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec)
+      assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec)
+      intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec)
+      }
+      intercept[AnalysisException] {
+        catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec)
+      }
     }
   }
 
   test("rename partitions when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.renamePartitions(
-        TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec))
-    }
-    intercept[NoSuchTableException] {
-      catalog.renamePartitions(
-        TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.renamePartitions(
+          TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec))
+      }
+      intercept[NoSuchTableException] {
+        catalog.renamePartitions(
+          TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec))
+      }
     }
   }
 
   test("rename partition with invalid partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.renamePartitions(
-        TableIdentifier("tbl1", Some("db2")),
-        Seq(part1.spec), Seq(partWithLessColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.renamePartitions(
-        TableIdentifier("tbl1", Some("db2")),
-        Seq(part1.spec), Seq(partWithMoreColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.renamePartitions(
-        TableIdentifier("tbl1", Some("db2")),
-        Seq(part1.spec), Seq(partWithUnknownColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.renamePartitions(
-        TableIdentifier("tbl1", Some("db2")),
-        Seq(part1.spec), Seq(partWithEmptyValue.spec))
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.renamePartitions(
+          TableIdentifier("tbl1", Some("db2")),
+          Seq(part1.spec), Seq(partWithLessColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.renamePartitions(
+          TableIdentifier("tbl1", Some("db2")),
+          Seq(part1.spec), Seq(partWithMoreColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.renamePartitions(
+          TableIdentifier("tbl1", Some("db2")),
+          Seq(part1.spec), Seq(partWithUnknownColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.renamePartitions(
+          TableIdentifier("tbl1", Some("db2")),
+          Seq(part1.spec), Seq(partWithEmptyValue.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("alter partitions") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val newLocation = newUriForDatabase()
-    // Alter but keep spec the same
-    val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
-    val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
-    catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(
-      oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
-      oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
-    val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
-    val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
-    assert(newPart1.storage.locationUri == Some(newLocation))
-    assert(newPart2.storage.locationUri == Some(newLocation))
-    assert(oldPart1.storage.locationUri != Some(newLocation))
-    assert(oldPart2.storage.locationUri != Some(newLocation))
-    // Alter partitions without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2))
-    val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec)
-    val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec)
-    assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri)
-    assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri)
-    // Alter but change spec, should fail because new partition specs do not exist yet
-    val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
-    val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
-    intercept[AnalysisException] {
-      catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2))
+    withBasicCatalog { catalog =>
+      val newLocation = newUriForDatabase()
+      // Alter but keep spec the same
+      val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+      val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+      catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(
+        oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
+        oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
+      val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
+      val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
+      assert(newPart1.storage.locationUri == Some(newLocation))
+      assert(newPart2.storage.locationUri == Some(newLocation))
+      assert(oldPart1.storage.locationUri != Some(newLocation))
+      assert(oldPart2.storage.locationUri != Some(newLocation))
+      // Alter partitions without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2))
+      val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec)
+      val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec)
+      assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri)
+      assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri)
+      // Alter but change spec, should fail because new partition specs do not exist yet
+      val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
+      val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
+      intercept[AnalysisException] {
+        catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2))
+      }
     }
   }
 
   test("alter partitions when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1))
-    }
-    intercept[NoSuchTableException] {
-      catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1))
+      }
+      intercept[NoSuchTableException] {
+        catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1))
+      }
     }
   }
 
   test("alter partition with invalid partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
-      "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
-    e = intercept[AnalysisException] {
-      catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+        "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+      e = intercept[AnalysisException] {
+        catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("list partition names") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4")
-    assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) ==
-      expectedPartitionNames)
-    // List partition names without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames)
+    withBasicCatalog { catalog =>
+      val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4")
+      assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) ==
+        expectedPartitionNames)
+      // List partition names without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames)
+    }
   }
 
   test("list partition names with partial partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(
-      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) ==
-        Seq("a=1/b=2"))
+    withBasicCatalog { catalog =>
+      assert(
+        catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) ==
+          Seq("a=1/b=2"))
+    }
   }
 
   test("list partition names with invalid partial partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(partWithMoreColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
-      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(partWithUnknownColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
-      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
-        Some(partWithEmptyValue.spec))
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+          Some(partWithMoreColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+        "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+          Some(partWithUnknownColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+        "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+          Some(partWithEmptyValue.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("list partitions") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalogPartitionsEqual(
-      catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2))
-    // List partitions without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2))
+    withBasicCatalog { catalog =>
+      assert(catalogPartitionsEqual(
+        catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2))
+      // List partitions without explicitly specifying database
+      catalog.setCurrentDatabase("db2")
+      assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2))
+    }
   }
 
   test("list partitions with partial partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalogPartitionsEqual(
-      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1))
+    withBasicCatalog { catalog =>
+      assert(catalogPartitionsEqual(
+        catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1))
+    }
   }
 
   test("list partitions with invalid partial partition spec") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    var e = intercept[AnalysisException] {
-      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
-      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
-        Some(partWithUnknownColumns.spec))
-    }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
-      "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
-    e = intercept[AnalysisException] {
-      catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
+    withBasicCatalog { catalog =>
+      var e = intercept[AnalysisException] {
+        catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+        "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+          Some(partWithUnknownColumns.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+        "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+      e = intercept[AnalysisException] {
+        catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
+      }
+      assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+        "empty partition column value"))
     }
-    assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
-      "empty partition column value"))
   }
 
   test("list partitions when database/table does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db")))
-    }
-    intercept[NoSuchTableException] {
-      catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2")))
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db")))
+      }
+      intercept[NoSuchTableException] {
+        catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2")))
+      }
     }
   }
 
@@ -999,8 +1085,17 @@ class SessionCatalogSuite extends PlanTest {
       expectedParts: CatalogTablePartition*): Boolean = {
     // ExternalCatalog may set a default location for partitions, here we ignore the partition
     // location when comparing them.
-    actualParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet ==
-      expectedParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet
+    // And for hive serde table, hive metastore will set some values(e.g.transient_lastDdlTime)
+    // in table's parameters and storage's properties, here we also ignore them.
+    val actualPartsNormalize = actualParts.map(p =>
+      p.copy(parameters = Map.empty, storage = p.storage.copy(
+        properties = Map.empty, locationUri = None, serde = None))).toSet
+
+    val expectedPartsNormalize = expectedParts.map(p =>
+        p.copy(parameters = Map.empty, storage = p.storage.copy(
+          properties = Map.empty, locationUri = None, serde = None))).toSet
+
+    actualPartsNormalize == expectedPartsNormalize
   }
 
   // --------------------------------------------------------------------------
@@ -1008,248 +1103,258 @@ class SessionCatalogSuite extends PlanTest {
   // --------------------------------------------------------------------------
 
   test("basic create and list functions") {
-    val externalCatalog = newEmptyCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
-    sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
-    assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
-    // Create function without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("mydb")
-    sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
-    assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
+    withEmptyCatalog { catalog =>
+      catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+      catalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+      // Create function without explicitly specifying database
+      catalog.setCurrentDatabase("mydb")
+      catalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
+      assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
+    }
   }
 
   test("create function when database does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[NoSuchDatabaseException] {
-      catalog.createFunction(
-        newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
+    withBasicCatalog { catalog =>
+      intercept[NoSuchDatabaseException] {
+        catalog.createFunction(
+          newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
+      }
     }
   }
 
   tes

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org