You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/15 01:05:43 UTC

spark git commit: [SPARK-10104] [SQL] Consolidate different forms of table identifiers

Repository: spark
Updated Branches:
  refs/heads/master 9a430a027 -> 56d7da14a


[SPARK-10104] [SQL] Consolidate different forms of table identifiers

Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.

Author: Wenchen Fan <we...@databricks.com>
Author: Wenchen Fan <cl...@163.com>

Closes #8453 from cloud-fan/table-name.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56d7da14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56d7da14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56d7da14

Branch: refs/heads/master
Commit: 56d7da14ab8f89bf4f303b27f51fd22d23967ffb
Parents: 9a430a0
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Oct 14 16:05:37 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Oct 14 16:05:37 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |   2 +-
 .../spark/sql/catalyst/TableIdentifier.scala    |  14 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   4 +-
 .../spark/sql/catalyst/analysis/Catalog.scala   | 174 +++++++------------
 .../sql/catalyst/analysis/unresolved.scala      |   6 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |   3 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  24 ++-
 .../sql/catalyst/analysis/AnalysisTest.scala    |  10 +-
 .../analysis/DecimalPrecisionSuite.scala        |   4 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   3 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   4 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   6 +-
 .../sql/execution/datasources/DDLParser.scala   |   2 +-
 .../spark/sql/execution/datasources/ddl.scala   |   7 +-
 .../spark/sql/execution/datasources/rules.scala |   4 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   7 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   5 +-
 .../org/apache/spark/sql/ListTablesSuite.scala  |   7 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   6 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 134 ++++----------
 .../org/apache/spark/sql/hive/HiveQl.scala      |  42 ++---
 .../hive/execution/CreateTableAsSelect.scala    |  12 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |   9 +-
 .../spark/sql/hive/execution/commands.scala     |  10 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   2 +-
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |   6 +-
 .../apache/spark/sql/hive/ListTablesSuite.scala |   5 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   9 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   5 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   6 +-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |   5 +-
 32 files changed, 212 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index dfab239..2595e1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -170,7 +170,7 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {
     joinedRelation | relationFactor
 
   protected lazy val relationFactor: Parser[LogicalPlan] =
-    ( rep1sep(ident, ".") ~ (opt(AS) ~> opt(ident)) ^^ {
+    ( tableIdentifier ~ (opt(AS) ~> opt(ident)) ^^ {
         case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
       }
       | ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
index d701559..4d4e4de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
@@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst
 /**
  * Identifies a `table` in `database`.  If `database` is not defined, the current database is used.
  */
-private[sql] case class TableIdentifier(table: String, database: Option[String] = None) {
-  def withDatabase(database: String): TableIdentifier = this.copy(database = Some(database))
-
-  def toSeq: Seq[String] = database.toSeq :+ table
+private[sql] case class TableIdentifier(table: String, database: Option[String]) {
+  def this(table: String) = this(table, None)
 
   override def toString: String = quotedString
 
-  def quotedString: String = toSeq.map("`" + _ + "`").mkString(".")
+  def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`")
+
+  def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table)
+}
 
-  def unquotedString: String = toSeq.mkString(".")
+private[sql] object TableIdentifier {
+  def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 041ab22..e604605 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -105,7 +105,7 @@ class Analyzer(
         // here use the CTE definition first, check table name only and ignore database name
         // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
         case u : UnresolvedRelation =>
-          val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+          val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
             val withAlias = u.alias.map(Subquery(_, relation))
             withAlias.getOrElse(relation)
           }
@@ -257,7 +257,7 @@ class Analyzer(
         catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
         case _: NoSuchTableException =>
-          u.failAnalysis(s"no such table ${u.tableName}")
+          u.failAnalysis(s"Table Not Found: ${u.tableName}")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 4cc9a55..8f4ce74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -42,11 +42,9 @@ trait Catalog {
 
   val conf: CatalystConf
 
-  def tableExists(tableIdentifier: Seq[String]): Boolean
+  def tableExists(tableIdent: TableIdentifier): Boolean
 
-  def lookupRelation(
-      tableIdentifier: Seq[String],
-      alias: Option[String] = None): LogicalPlan
+  def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan
 
   /**
    * Returns tuples of (tableName, isTemporary) for all tables in the given database.
@@ -56,89 +54,59 @@ trait Catalog {
 
   def refreshTable(tableIdent: TableIdentifier): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
+  def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  def unregisterTable(tableIdentifier: Seq[String]): Unit
+  def unregisterTable(tableIdent: TableIdentifier): Unit
 
   def unregisterAllTables(): Unit
 
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
-    if (conf.caseSensitiveAnalysis) {
-      tableIdentifier
-    } else {
-      tableIdentifier.map(_.toLowerCase)
-    }
-  }
-
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def getDbTableName(tableIdent: Seq[String]): String = {
-    val size = tableIdent.size
-    if (size <= 2) {
-      tableIdent.mkString(".")
-    } else {
-      tableIdent.slice(size - 2, size).mkString(".")
-    }
-  }
-
-  // TODO: Refactor it in the work of SPARK-10104
-  protected def getDBTable(tableIdent: Seq[String]) : (Option[String], String) = {
-    (tableIdent.lift(tableIdent.size - 2), tableIdent.last)
-  }
-
   /**
-   * It is not allowed to specifiy database name for tables stored in [[SimpleCatalog]].
-   * We use this method to check it.
+   * Get the table name of TableIdentifier for temporary tables.
    */
-  protected def checkTableIdentifier(tableIdentifier: Seq[String]): Unit = {
-    if (tableIdentifier.length > 1) {
+  protected def getTableName(tableIdent: TableIdentifier): String = {
+    // It is not allowed to specify database name for temporary tables.
+    // We check it here and throw exception if database is defined.
+    if (tableIdent.database.isDefined) {
       throw new AnalysisException("Specifying database name or other qualifiers are not allowed " +
         "for temporary tables. If the table name has dots (.) in it, please quote the " +
         "table name with backticks (`).")
     }
+    if (conf.caseSensitiveAnalysis) {
+      tableIdent.table
+    } else {
+      tableIdent.table.toLowerCase
+    }
   }
 }
 
 class SimpleCatalog(val conf: CatalystConf) extends Catalog {
-  val tables = new ConcurrentHashMap[String, LogicalPlan]
-
-  override def registerTable(
-      tableIdentifier: Seq[String],
-      plan: LogicalPlan): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.put(getDbTableName(tableIdent), plan)
+  private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
+
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+    tables.put(getTableName(tableIdent), plan)
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.remove(getDbTableName(tableIdent))
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+    tables.remove(getTableName(tableIdent))
   }
 
   override def unregisterAllTables(): Unit = {
     tables.clear()
   }
 
-  override def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.containsKey(getDbTableName(tableIdent))
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    tables.containsKey(getTableName(tableIdent))
   }
 
   override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val tableFullName = getDbTableName(tableIdent)
-    val table = tables.get(tableFullName)
+    val tableName = getTableName(tableIdent)
+    val table = tables.get(tableName)
     if (table == null) {
-      sys.error(s"Table Not Found: $tableFullName")
+      throw new NoSuchTableException
     }
-    val tableWithQualifiers = Subquery(tableIdent.last, table)
+    val tableWithQualifiers = Subquery(tableName, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
     // properly qualified with this alias.
@@ -146,11 +114,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    val result = ArrayBuffer.empty[(String, Boolean)]
-    for (name <- tables.keySet().asScala) {
-      result += ((name, true))
-    }
-    result
+    tables.keySet().asScala.map(_ -> true).toSeq
   }
 
   override def refreshTable(tableIdent: TableIdentifier): Unit = {
@@ -165,68 +129,50 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
  * lost when the JVM exits.
  */
 trait OverrideCatalog extends Catalog {
+  private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan]
 
-  // TODO: This doesn't work when the database changes...
-  val overrides = new mutable.HashMap[(Option[String], String), LogicalPlan]()
-
-  abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    // A temporary tables only has a single part in the tableIdentifier.
-    val overriddenTable = if (tableIdentifier.length > 1) {
-      None: Option[LogicalPlan]
+  private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = {
+    if (tableIdent.database.isDefined) {
+      None
     } else {
-      overrides.get(getDBTable(tableIdent))
+      Option(overrides.get(getTableName(tableIdent)))
     }
-    overriddenTable match {
+  }
+
+  abstract override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    getOverriddenTable(tableIdent) match {
       case Some(_) => true
-      case None => super.tableExists(tableIdentifier)
+      case None => super.tableExists(tableIdent)
     }
   }
 
   abstract override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    // A temporary tables only has a single part in the tableIdentifier.
-    val overriddenTable = if (tableIdentifier.length > 1) {
-      None: Option[LogicalPlan]
-    } else {
-      overrides.get(getDBTable(tableIdent))
-    }
-    val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.last, r))
+    getOverriddenTable(tableIdent) match {
+      case Some(table) =>
+        val tableName = getTableName(tableIdent)
+        val tableWithQualifiers = Subquery(tableName, table)
 
-    // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
-    // properly qualified with this alias.
-    val withAlias =
-      tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
+        // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes
+        // are properly qualified with this alias.
+        alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
 
-    withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
+      case None => super.lookupRelation(tableIdent, alias)
+    }
   }
 
   abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    // We always return all temporary tables.
-    val temporaryTables = overrides.map {
-      case ((_, tableName), _) => (tableName, true)
-    }.toSeq
-
-    temporaryTables ++ super.getTables(databaseName)
+    overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName)
   }
 
-  override def registerTable(
-      tableIdentifier: Seq[String],
-      plan: LogicalPlan): Unit = {
-    checkTableIdentifier(tableIdentifier)
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    overrides.put(getDBTable(tableIdent), plan)
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
+    overrides.put(getTableName(tableIdent), plan)
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
-    // A temporary tables only has a single part in the tableIdentifier.
-    // If tableIdentifier has more than one parts, it is not a temporary table
-    // and we do not need to do anything at here.
-    if (tableIdentifier.length == 1) {
-      val tableIdent = processTableIdentifier(tableIdentifier)
-      overrides.remove(getDBTable(tableIdent))
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
+    if (tableIdent.database.isEmpty) {
+      overrides.remove(getTableName(tableIdent))
     }
   }
 
@@ -243,12 +189,12 @@ object EmptyCatalog extends Catalog {
 
   override val conf: CatalystConf = EmptyConf
 
-  override def tableExists(tableIdentifier: Seq[String]): Boolean = {
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
     throw new UnsupportedOperationException
   }
 
   override def lookupRelation(
-      tableIdentifier: Seq[String],
+      tableIdent: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
     throw new UnsupportedOperationException
   }
@@ -257,15 +203,17 @@ object EmptyCatalog extends Catalog {
     throw new UnsupportedOperationException
   }
 
-  override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
 
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException
   }
 
-  override def unregisterAllTables(): Unit = {}
+  override def unregisterAllTables(): Unit = {
+    throw new UnsupportedOperationException
+  }
 
   override def refreshTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 43ee319..c973650 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.errors
+import org.apache.spark.sql.catalyst.{TableIdentifier, errors}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -36,11 +36,11 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
  * Holds the name of a relation that has yet to be looked up in a [[Catalog]].
  */
 case class UnresolvedRelation(
-    tableIdentifier: Seq[String],
+    tableIdentifier: TableIdentifier,
     alias: Option[String] = None) extends LeafNode {
 
   /** Returns a `.` separated name for this relation. */
-  def tableName: String = tableIdentifier.mkString(".")
+  def tableName: String = tableIdentifier.unquotedString
 
   override def output: Seq[Attribute] = Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 699c4cc..27b3cd8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -286,7 +286,8 @@ package object dsl {
 
       def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
         InsertIntoTable(
-          analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
+          analysis.UnresolvedRelation(TableIdentifier(tableName)),
+          Map.empty, logicalPlan, overwrite, false)
 
       def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer.execute(logicalPlan))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 820b336..ec05cfa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -53,32 +54,39 @@ class AnalysisSuite extends AnalysisTest {
       Project(testRelation.output, testRelation))
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("TbL.a")),
+        UnresolvedRelation(TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation))
 
     assertAnalysisError(
-      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Seq("cannot resolve"))
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("TbL.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation),
       caseSensitive = false)
 
     checkAnalysis(
-      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(Seq("TaBlE"), Some("TbL"))),
+      Project(Seq(UnresolvedAttribute("tBl.a")), UnresolvedRelation(
+        TableIdentifier("TaBlE"), Some("TbL"))),
       Project(testRelation.output, testRelation),
       caseSensitive = false)
   }
 
   test("resolve relations") {
-    assertAnalysisError(UnresolvedRelation(Seq("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+    assertAnalysisError(
+      UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
 
-    checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation)
+    checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
 
-    checkAnalysis(UnresolvedRelation(Seq("tAbLe"), None), testRelation, caseSensitive = false)
+    checkAnalysis(
+      UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
 
-    checkAnalysis(UnresolvedRelation(Seq("TaBlE"), None), testRelation, caseSensitive = false)
+    checkAnalysis(
+      UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
   }
 
   test("divide should be casted into fractional types") {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 53b3695..23861ed 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
 
 trait AnalysisTest extends PlanTest {
 
@@ -30,8 +31,8 @@ trait AnalysisTest extends PlanTest {
     val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
     val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
 
-    caseSensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
-    caseInsensitiveCatalog.registerTable(Seq("TaBlE"), TestRelations.testRelation)
+    caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
+    caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation)
 
     new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
       override val extendedResolutionRules = EliminateSubQueries :: Nil
@@ -67,8 +68,7 @@ trait AnalysisTest extends PlanTest {
       expectedErrors: Seq[String],
       caseSensitive: Boolean = true): Unit = {
     val analyzer = getAnalyzer(caseSensitive)
-    // todo: make sure we throw AnalysisException during analysis
-    val e = intercept[Exception] {
+    val e = intercept[AnalysisException] {
       analyzer.checkAnalysis(analyzer.execute(inputPlan))
     }
     assert(expectedErrors.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains),

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index b4ad618..40c4ae7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.{TableIdentifier, SimpleCatalystConf}
 
 class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
   val conf = new SimpleCatalystConf(true)
@@ -47,7 +47,7 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
   val b: Expression = UnresolvedAttribute("b")
 
   before {
-    catalog.registerTable(Seq("table"), relation)
+    catalog.registerTable(TableIdentifier("table"), relation)
   }
 
   private def checkType(expression: Expression, expectedType: DataType): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 97a8b65..eacdea2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{Logging, Partition}
+import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
 
 /**
  * :: Experimental ::
@@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @since 1.4.0
    */
   def table(tableName: String): DataFrame = {
-    DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+    DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName)))
   }
 
   ///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03e9736..764510a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
     val overwrite = mode == SaveMode.Overwrite
     df.sqlContext.executePlan(
       InsertIntoTable(
-        UnresolvedRelation(tableIdent.toSeq),
+        UnresolvedRelation(tableIdent),
         partitions.getOrElse(Map.empty[String, Option[String]]),
         df.logicalPlan,
         overwrite,
@@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq)
+    val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3d5e35a..361eb57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -714,7 +714,7 @@ class SQLContext private[sql](
    * only during the lifetime of this instance of SQLContext.
    */
   private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
-    catalog.registerTable(Seq(tableName), df.logicalPlan)
+    catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
   }
 
   /**
@@ -728,7 +728,7 @@ class SQLContext private[sql](
    */
   def dropTempTable(tableName: String): Unit = {
     cacheManager.tryUncacheQuery(table(tableName))
-    catalog.unregisterTable(Seq(tableName))
+    catalog.unregisterTable(TableIdentifier(tableName))
   }
 
   /**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
   }
 
   private def table(tableIdent: TableIdentifier): DataFrame = {
-    DataFrame(this, catalog.lookupRelation(tableIdent.toSeq))
+    DataFrame(this, catalog.lookupRelation(tableIdent))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index f7a88b9..446739d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
   protected lazy val describeTable: Parser[LogicalPlan] =
     (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
       case e ~ tableIdent =>
-        DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined)
+        DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
     }
 
   protected lazy val refreshTable: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31d6b75..e7deeff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -71,7 +71,6 @@ case class CreateTableUsing(
  * can analyze the logical plan that will be used to populate the table.
  * So, [[PreWriteCheck]] can detect cases that are not allowed.
  */
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 case class CreateTableUsingAsSelect(
     tableIdent: TableIdentifier,
     provider: String,
@@ -93,7 +92,7 @@ case class CreateTempTableUsing(
     val resolved = ResolvedDataSource(
       sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
     sqlContext.catalog.registerTable(
-      tableIdent.toSeq,
+      tableIdent,
       DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
 
     Seq.empty[Row]
@@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect(
     val df = DataFrame(sqlContext, query)
     val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
     sqlContext.catalog.registerTable(
-      tableIdent.toSeq,
+      tableIdent,
       DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
 
     Seq.empty[Row]
@@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
 
     // If this table is cached as a InMemoryColumnarRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq)
+    val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
     val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 8efc801..b00e568 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
       case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
           // Need to remove SubQuery operator.
-          EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
+          EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
             case l @ LogicalRelation(dest: BaseRelation, _) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 356d4ff..fd566c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.PhysicalRDD
 
 import scala.concurrent.duration._
@@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
     testData.select('key).registerTempTable("t1")
     sqlContext.table("t1")
     sqlContext.dropTempTable("t1")
-    assert(
-      intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+    intercept[NoSuchTableException](sqlContext.table("t1"))
   }
 
   test("Drops cached temporary table") {
@@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
     assert(sqlContext.isCached("t2"))
 
     sqlContext.dropTempTable("t1")
-    assert(
-      intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+    intercept[NoSuchTableException](sqlContext.table("t1"))
     assert(!sqlContext.isCached("t2"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7a027e1..b1fb068 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     upperCaseData.where('N <= 4).registerTempTable("left")
     upperCaseData.where('N >= 3).registerTempTable("right")
 
-    val left = UnresolvedRelation(Seq("left"), None)
-    val right = UnresolvedRelation(Seq("right"), None)
+    val left = UnresolvedRelation(TableIdentifier("left"), None)
+    val right = UnresolvedRelation(TableIdentifier("right"), None)
 
     checkAnswer(
       left.join(right, $"left.N" === $"right.N", "full"),

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index eab0fbb..5688f46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
   import testImplicits._
@@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
   }
 
   after {
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
   }
 
   test("get all tables") {
@@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 
@@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
       sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
       Row("ListTablesSuiteTable", true))
 
-    sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index cc02ef8..baff7f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
 import org.apache.spark.sql.test.SharedSQLContext
@@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
     }
-    sqlContext.catalog.unregisterTable(Seq("tmp"))
+    sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e620d7f..4d8a3f7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -358,7 +358,7 @@ class HiveContext private[hive](
   @Experimental
   def analyze(tableName: String) {
     val tableIdent = SqlParser.parseTableIdentifier(tableName)
-    val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq))
+    val relation = EliminateSubQueries(catalog.lookupRelation(tableIdent))
 
     relation match {
       case relation: MetastoreRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1f8223e..5819cb9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.execution.{FileRelation, datasources}
@@ -103,10 +103,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
   /** Usages should lock on `this`. */
   protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
 
-  // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
   /** A fully qualified identifier for a table (i.e., database.tableName) */
-  case class QualifiedTableName(database: String, name: String) {
-    def toLowerCase: QualifiedTableName = QualifiedTableName(database.toLowerCase, name.toLowerCase)
+  case class QualifiedTableName(database: String, name: String)
+
+  private def getQualifiedTableName(tableIdent: TableIdentifier) = {
+    QualifiedTableName(
+      tableIdent.database.getOrElse(client.currentDatabase).toLowerCase,
+      tableIdent.table.toLowerCase)
+  }
+
+  private def getQualifiedTableName(hiveTable: HiveTable) = {
+    QualifiedTableName(
+      hiveTable.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase,
+      hiveTable.name.toLowerCase)
   }
 
   /** A cache of Spark SQL data source tables that have been accessed. */
@@ -179,33 +188,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
   }
 
   def invalidateTable(tableIdent: TableIdentifier): Unit = {
-    val databaseName = tableIdent.database.getOrElse(client.currentDatabase)
-    val tableName = tableIdent.table
-
-    cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase)
-  }
-
-  val caseSensitive: Boolean = false
-
-  /**
-   * Creates a data source table (a table created with USING clause) in Hive's metastore.
-   * Returns true when the table has been created. Otherwise, false.
-   */
-  // TODO: Remove this in SPARK-10104.
-  def createDataSourceTable(
-      tableName: String,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      provider: String,
-      options: Map[String, String],
-      isExternal: Boolean): Unit = {
-    createDataSourceTable(
-      SqlParser.parseTableIdentifier(tableName),
-      userSpecifiedSchema,
-      partitionColumns,
-      provider,
-      options,
-      isExternal)
+    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
   }
 
   def createDataSourceTable(
@@ -215,10 +198,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       provider: String,
       options: Map[String, String],
       isExternal: Boolean): Unit = {
-    val (dbName, tblName) = {
-      val database = tableIdent.database.getOrElse(client.currentDatabase)
-      processDatabaseAndTableName(database, tableIdent.table)
-    }
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
 
     val tableProperties = new mutable.HashMap[String, String]
     tableProperties.put("spark.sql.sources.provider", provider)
@@ -311,7 +291,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
 
     // TODO: Support persisting partitioned data source relations in Hive compatible format
     val qualifiedTableName = tableIdent.quotedString
-    val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
+    val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.relation) match {
       case (Some(serde), relation: HadoopFsRelation)
         if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
         val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
@@ -349,9 +329,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         (None, message)
     }
 
-    (hiveCompitiableTable, logMessage) match {
+    (hiveCompatibleTable, logMessage) match {
       case (Some(table), message) =>
-        // We first try to save the metadata of the table in a Hive compatiable way.
+        // We first try to save the metadata of the table in a Hive compatible way.
         // If Hive throws an error, we fall back to save its metadata in the Spark SQL
         // specific way.
         try {
@@ -374,48 +354,29 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     }
   }
 
-  def hiveDefaultTableFilePath(tableName: String): String = {
-    hiveDefaultTableFilePath(SqlParser.parseTableIdentifier(tableName))
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
-    val database = tableIdent.database.getOrElse(client.currentDatabase)
-
-    new Path(
-      new Path(client.getDatabase(database).location),
-      tableIdent.table.toLowerCase).toString
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+    new Path(new Path(client.getDatabase(dbName).location), tblName).toString
   }
 
-  def tableExists(tableIdentifier: Seq[String]): Boolean = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val databaseName =
-      tableIdent
-        .lift(tableIdent.size - 2)
-        .getOrElse(client.currentDatabase)
-    val tblName = tableIdent.last
-    client.getTableOption(databaseName, tblName).isDefined
+  override def tableExists(tableIdent: TableIdentifier): Boolean = {
+    val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
+    client.getTableOption(dbName, tblName).isDefined
   }
 
-  def lookupRelation(
-      tableIdentifier: Seq[String],
+  override def lookupRelation(
+      tableIdent: TableIdentifier,
       alias: Option[String]): LogicalPlan = {
-    val tableIdent = processTableIdentifier(tableIdentifier)
-    val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
-      client.currentDatabase)
-    val tblName = tableIdent.last
-    val table = client.getTable(databaseName, tblName)
+    val qualifiedTableName = getQualifiedTableName(tableIdent)
+    val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
 
     if (table.properties.get("spark.sql.sources.provider").isDefined) {
-      val dataSourceTable =
-        cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
+      val tableWithQualifiers = Subquery(qualifiedTableName.name, dataSourceTable)
       // Then, if alias is specified, wrap the table with a Subquery using the alias.
       // Otherwise, wrap the table with a Subquery using the table name.
-      val withAlias =
-        alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
-          Subquery(tableIdent.last, dataSourceTable))
-
-      withAlias
+      alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
     } else if (table.tableType == VirtualView) {
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
       alias match {
@@ -425,7 +386,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         case Some(aliasText) => Subquery(aliasText, HiveQl.createPlan(viewText))
       }
     } else {
-      MetastoreRelation(databaseName, tblName, alias)(table)(hive)
+      MetastoreRelation(qualifiedTableName.database, qualifiedTableName.name, alias)(table)(hive)
     }
   }
 
@@ -524,26 +485,6 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     client.listTables(db).map(tableName => (tableName, false))
   }
 
-  protected def processDatabaseAndTableName(
-      databaseName: Option[String],
-      tableName: String): (Option[String], String) = {
-    if (!caseSensitive) {
-      (databaseName.map(_.toLowerCase), tableName.toLowerCase)
-    } else {
-      (databaseName, tableName)
-    }
-  }
-
-  protected def processDatabaseAndTableName(
-      databaseName: String,
-      tableName: String): (String, String) = {
-    if (!caseSensitive) {
-      (databaseName.toLowerCase, tableName.toLowerCase)
-    } else {
-      (databaseName, tableName)
-    }
-  }
-
   /**
    * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
    * data source relations for better performance.
@@ -597,8 +538,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
               "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
           }
 
-          val (dbName, tblName) = processDatabaseAndTableName(
-            table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
           execution.CreateViewAsSelect(
             table.copy(
@@ -636,7 +576,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
           val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
           CreateTableUsingAsSelect(
             TableIdentifier(desc.name),
-            hive.conf.defaultDataSourceName,
+            conf.defaultDataSourceName,
             temporary = false,
             Array.empty[String],
             mode,
@@ -652,9 +592,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             table
           }
 
-          val (dbName, tblName) =
-            processDatabaseAndTableName(
-              desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)
+          val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
           execution.CreateTableAsSelect(
             desc.copy(
@@ -712,7 +650,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
    */
-  override def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
+  override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
     throw new UnsupportedOperationException
   }
 
@@ -720,7 +658,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
    * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
    * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
    */
-  override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
+  override def unregisterTable(tableIdent: TableIdentifier): Unit = {
     throw new UnsupportedOperationException
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 1d50501..d4ff5cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{logical, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveShim._
@@ -442,24 +443,12 @@ private[hive] object HiveQl extends Logging {
       throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
   }
 
-  protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
-    val (db, tableName) =
-      tableNameParts.getChildren.asScala.map {
-        case Token(part, Nil) => cleanIdentifier(part)
-      } match {
-        case Seq(tableOnly) => (None, tableOnly)
-        case Seq(databaseName, table) => (Some(databaseName), table)
-      }
-
-    (db, tableName)
-  }
-
-  protected def extractTableIdent(tableNameParts: Node): Seq[String] = {
+  protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
     tableNameParts.getChildren.asScala.map {
       case Token(part, Nil) => cleanIdentifier(part)
     } match {
-      case Seq(tableOnly) => Seq(tableOnly)
-      case Seq(databaseName, table) => Seq(databaseName, table)
+      case Seq(tableOnly) => TableIdentifier(tableOnly)
+      case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
       case other => sys.error("Hive only supports tables names like 'tableName' " +
         s"or 'databaseName.tableName', found '$other'")
     }
@@ -518,13 +507,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       properties: Map[String, String],
       allowExist: Boolean,
       replace: Boolean): CreateViewAsSelect = {
-    val (db, viewName) = extractDbNameTableName(viewNameParts)
+    val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
 
     val originalText = context.getTokenRewriteStream
       .toString(query.getTokenStartIndex, query.getTokenStopIndex)
 
     val tableDesc = HiveTable(
-      specifiedDatabase = db,
+      specifiedDatabase = dbName,
       name = viewName,
       schema = schema,
       partitionColumns = Seq.empty[HiveColumn],
@@ -611,7 +600,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
               case tableName =>
                 // It is describing a table with the format like "describe table".
                 DescribeCommand(
-                  UnresolvedRelation(Seq(tableName.getText), None), isExtended = extended.isDefined)
+                  UnresolvedRelation(TableIdentifier(tableName.getText), None),
+                  isExtended = extended.isDefined)
             }
           }
           // All other cases.
@@ -716,12 +706,12 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             "TOK_TABLELOCATION",
             "TOK_TABLEPROPERTIES"),
           children)
-      val (db, tableName) = extractDbNameTableName(tableNameParts)
+      val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
 
       // TODO add bucket support
       var tableDesc: HiveTable = HiveTable(
-        specifiedDatabase = db,
-        name = tableName,
+        specifiedDatabase = dbName,
+        name = tblName,
         schema = Seq.empty[HiveColumn],
         partitionColumns = Seq.empty[HiveColumn],
         properties = Map[String, String](),
@@ -1264,15 +1254,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           nonAliasClauses)
       }
 
-      val tableIdent =
-        tableNameParts.getChildren.asScala.map {
-          case Token(part, Nil) => cleanIdentifier(part)
-        } match {
-          case Seq(tableOnly) => Seq(tableOnly)
-          case Seq(databaseName, table) => Seq(databaseName, table)
-          case other => sys.error("Hive only supports tables names like 'tableName' " +
-            s"or 'databaseName.tableName', found '$other'")
-      }
+      val tableIdent = extractTableIdent(tableNameParts)
       val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
       val relation = UnresolvedRelation(tableIdent, alias)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 8422287..e72a60b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
@@ -37,8 +38,7 @@ case class CreateTableAsSelect(
     allowExisting: Boolean)
   extends RunnableCommand {
 
-  def database: String = tableDesc.database
-  def tableName: String = tableDesc.name
+  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
 
   override def children: Seq[LogicalPlan] = Seq(query)
 
@@ -72,18 +72,18 @@ case class CreateTableAsSelect(
       hiveContext.catalog.client.createTable(withSchema)
 
       // Get the Metastore Relation
-      hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {
+      hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
         case r: MetastoreRelation => r
       }
     }
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    if (hiveContext.catalog.tableExists(Seq(database, tableName))) {
+    if (hiveContext.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
         // table already exists, will do nothing, to keep consistent with Hive
       } else {
-        throw new AnalysisException(s"$database.$tableName already exists.")
+        throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
       hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
@@ -93,6 +93,6 @@ case class CreateTableAsSelect(
   }
 
   override def argString: String = {
-    s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]"
+    s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]"
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 2b504ac..2c81115 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
@@ -38,18 +39,18 @@ private[hive] case class CreateViewAsSelect(
   assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
   assert(tableDesc.viewText.isDefined)
 
+  val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database))
+
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    val database = tableDesc.database
-    val viewName = tableDesc.name
 
-    if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+    if (hiveContext.catalog.tableExists(tableIdentifier)) {
       if (allowExisting) {
         // view already exists, will do nothing, to keep consistent with Hive
       } else if (orReplace) {
         hiveContext.catalog.client.alertView(prepareTable())
       } else {
-        throw new AnalysisException(s"View $database.$viewName already exists. " +
+        throw new AnalysisException(s"View $tableIdentifier already exists. " +
           "If you want to update the view definition, please use ALTER VIEW AS or " +
           "CREATE OR REPLACE VIEW AS")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 51ec92a..94210a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
     }
     hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
-    hiveContext.catalog.unregisterTable(Seq(tableName))
+    hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
     Seq.empty[Row]
   }
 }
@@ -103,7 +103,6 @@ case class AddFile(path: String) extends RunnableCommand {
   }
 }
 
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 private[hive]
 case class CreateMetastoreDataSource(
     tableIdent: TableIdentifier,
@@ -131,7 +130,7 @@ case class CreateMetastoreDataSource(
     val tableName = tableIdent.unquotedString
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
 
-    if (hiveContext.catalog.tableExists(tableIdent.toSeq)) {
+    if (hiveContext.catalog.tableExists(tableIdent)) {
       if (allowExisting) {
         return Seq.empty[Row]
       } else {
@@ -160,7 +159,6 @@ case class CreateMetastoreDataSource(
   }
 }
 
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
 private[hive]
 case class CreateMetastoreDataSourceAsSelect(
     tableIdent: TableIdentifier,
@@ -198,7 +196,7 @@ case class CreateMetastoreDataSourceAsSelect(
       }
 
     var existingSchema = None: Option[StructType]
-    if (sqlContext.catalog.tableExists(tableIdent.toSeq)) {
+    if (sqlContext.catalog.tableExists(tableIdent)) {
       // Check if we need to throw an exception or just return.
       mode match {
         case SaveMode.ErrorIfExists =>
@@ -215,7 +213,7 @@ case class CreateMetastoreDataSourceAsSelect(
           val resolved = ResolvedDataSource(
             sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
-          EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
+          EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ff39ccb..6883d30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -191,7 +191,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       // Make sure any test tables referenced are loaded.
       val referencedTables =
         describedTables ++
-        logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.last }
+        logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table }
       val referencedTestTables = referencedTables.filter(testTables.contains)
       logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
       referencedTestTables.foreach(loadTestTable)

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index c8d2727..8c4af1b 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.spark.sql.SaveMode;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +40,8 @@ import org.apache.spark.sql.hive.test.TestHive$;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.util.Utils;
 
 public class JavaMetastoreDataSourcesSuite {
@@ -71,7 +72,8 @@ public class JavaMetastoreDataSourcesSuite {
     if (path.exists()) {
       path.delete();
     }
-    hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
+    hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+      new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
     if (fs.exists(hiveManagedPath)){
       fs.delete(hiveManagedPath, true);

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 579631d..183aca2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.Row
 
@@ -31,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
 
   override def beforeAll(): Unit = {
     // The catalog in HiveContext is a case insensitive one.
-    catalog.registerTable(Seq("ListTablesSuiteTable"), df.logicalPlan)
+    catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
     sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
     sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
     sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
   }
 
   override def afterAll(): Unit = {
-    catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+    catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
     sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
     sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
     sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d356538..d292887 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.util.Utils
 
 /**
@@ -367,7 +368,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
            |)
          """.stripMargin)
 
-      val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+      val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
       if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -472,7 +473,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           // Drop table will also delete the data.
           sql("DROP TABLE savedJsonTable")
           intercept[IOException] {
-            read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+            read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
           }
         }
 
@@ -703,7 +704,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
         // Manually create a metastore data source table.
         catalog.createDataSourceTable(
-          tableName = "wide_schema",
+          tableIdent = TableIdentifier("wide_schema"),
           userSpecifiedSchema = Some(schema),
           partitionColumns = Array.empty[String],
           provider = "json",
@@ -733,7 +734,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           "EXTERNAL" -> "FALSE"),
         tableType = ManagedTable,
         serdeProperties = Map(
-          "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+          "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))))
 
       catalog.client.createTable(hiveTable)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 6a692d6..9bb32f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{Row, SQLConf, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -68,7 +69,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes
+      hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -115,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     intercept[UnsupportedOperationException] {
       hiveContext.analyze("tempTable")
     }
-    hiveContext.catalog.unregisterTable(Seq("tempTable"))
+    hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
   }
 
   test("estimates the size of a test MetastoreRelation") {

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6aa3460..c929ba5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.DefaultParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, DefaultParserDialect}
 import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries}
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -266,7 +266,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   test("CTAS without serde") {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
-      val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
+      val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
       relation match {
         case LogicalRelation(r: ParquetRelation, _) =>
           if (!isDataSourceParquet) {
@@ -723,7 +723,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     (1 to 100).par.map { i =>
       val tableName = s"SPARK_6618_table_$i"
       sql(s"CREATE TABLE $tableName (col1 string)")
-      catalog.lookupRelation(Seq(tableName))
+      catalog.lookupRelation(TableIdentifier(tableName))
       table(tableName)
       tables()
       sql(s"DROP TABLE $tableName")

http://git-wip-us.apache.org/repos/asf/spark/blob/56d7da14/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 5eb39b1..7efeab5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 
@@ -218,7 +219,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT INTO TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
-    catalog.unregisterTable(Seq("tmp"))
+    catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("overwriting") {
@@ -228,7 +229,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
       checkAnswer(table("t"), data.map(Row.fromTuple))
     }
-    catalog.unregisterTable(Seq("tmp"))
+    catalog.unregisterTable(TableIdentifier("tmp"))
   }
 
   test("self-join") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org