You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/01/11 21:44:17 UTC

spark git commit: [SPARK-18801][SQL] Support resolve a nested view

Repository: spark
Updated Branches:
  refs/heads/master 3bc2eff88 -> 30a07071f


[SPARK-18801][SQL] Support resolve a nested view

## What changes were proposed in this pull request?

We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated.
The new approach should be compatible with older versions of SPARK/HIVE, that means:
1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE;
2. The new approach should be able to resolve the views that are currently supported by SPARK SQL.

The new approach mainly brings in the following changes:
1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view;
2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly;
3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`;
4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query;
5. Enables the view support without enabling Hive support (i.e., enableHiveSupport);
6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed.

Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined.

## How was this patch tested?
1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`;
2. Add new test case in `SQLViewSuite` to test resolve a nested view.

Author: jiangxingbo <ji...@gmail.com>

Closes #16233 from jiangxb1987/resolve-view.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30a07071
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30a07071
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30a07071

Branch: refs/heads/master
Commit: 30a07071f099c0ebcf04c4df61f8d414dcbad7b5
Parents: 3bc2eff
Author: jiangxingbo <ji...@gmail.com>
Authored: Wed Jan 11 13:44:07 2017 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Jan 11 13:44:07 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 158 +++++++++++++++----
 .../spark/sql/catalyst/analysis/view.scala      |  80 ++++++++++
 .../sql/catalyst/catalog/SessionCatalog.scala   |  31 +++-
 .../spark/sql/catalyst/catalog/interface.scala  |   9 ++
 .../sql/catalyst/optimizer/Optimizer.scala      |   1 +
 .../plans/logical/basicLogicalOperators.scala   |  30 ++++
 .../catalyst/catalog/ExternalCatalogSuite.scala |  30 +++-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  36 +++--
 .../apache/spark/sql/catalyst/SQLBuilder.scala  |   3 +
 .../spark/sql/internal/SessionState.scala       |   3 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  10 ++
 .../spark/sql/hive/HiveExternalCatalog.scala    |   4 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  20 ++-
 .../spark/sql/hive/HiveSessionCatalog.scala     |  12 +-
 .../spark/sql/hive/HiveSessionState.scala       |   3 +-
 .../spark/sql/hive/execution/SQLViewSuite.scala | 155 ++++++++++++++++++
 16 files changed, 516 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 73e9206..d461531 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -21,8 +21,8 @@ import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.OuterScopes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -50,6 +50,39 @@ object SimpleAnalyzer extends Analyzer(
     new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
+ * Provides a way to keep state during the analysis, this enables us to decouple the concerns
+ * of analysis environment from the catalog.
+ *
+ * Note this is thread local.
+ *
+ * @param defaultDatabase The default database used in the view resolution, this overrules the
+ *                        current catalog database.
+ * @param nestedViewLevel The nested level in the view resolution, this enables us to limit the
+ *                        depth of nested views.
+ *                        TODO Limit the depth of nested views.
+ */
+case class AnalysisContext(
+    defaultDatabase: Option[String] = None,
+    nestedViewLevel: Int = 0)
+
+object AnalysisContext {
+  private val value = new ThreadLocal[AnalysisContext]() {
+    override def initialValue: AnalysisContext = AnalysisContext()
+  }
+
+  def get: AnalysisContext = value.get()
+  private def set(context: AnalysisContext): Unit = value.set(context)
+
+  def withAnalysisContext[A](database: Option[String])(f: => A): A = {
+    val originContext = value.get()
+    val context = AnalysisContext(defaultDatabase = database,
+      nestedViewLevel = originContext.nestedViewLevel + 1)
+    set(context)
+    try f finally { set(originContext) }
+  }
+}
+
+/**
  * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
  * [[UnresolvedRelation]]s into fully typed objects using information in a
  * [[SessionCatalog]] and a [[FunctionRegistry]].
@@ -106,6 +139,8 @@ class Analyzer(
       ResolveInlineTables ::
       TypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
+    Batch("View", Once,
+      AliasViewChild(conf)),
     Batch("Nondeterministic", Once,
       PullOutNondeterministic),
     Batch("UDF", Once,
@@ -510,32 +545,87 @@ class Analyzer(
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
   object ResolveRelations extends Rule[LogicalPlan] {
-    private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan = {
+
+    // If the unresolved relation is running directly on files, we just return the original
+    // UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog
+    // and change the default database name(in AnalysisContext) if it is a view.
+    // We usually look up a table from the default database if the table identifier has an empty
+    // database part, for a view the default database should be the currentDb when the view was
+    // created. When the case comes to resolving a nested view, the view may have different default
+    // database with that the referenced view has, so we need to use
+    // `AnalysisContext.defaultDatabase` to track the current default database.
+    // When the relation we resolve is a view, we fetch the view.desc(which is a CatalogTable), and
+    // then set the value of `CatalogTable.viewDefaultDatabase` to
+    // `AnalysisContext.defaultDatabase`, we look up the relations that the view references using
+    // the default database.
+    // For example:
+    // |- view1 (defaultDatabase = db1)
+    //   |- operator
+    //     |- table2 (defaultDatabase = db1)
+    //     |- view2 (defaultDatabase = db2)
+    //        |- view3 (defaultDatabase = db3)
+    //   |- view4 (defaultDatabase = db4)
+    // In this case, the view `view1` is a nested view, it directly references `table2`\u3001`view2`
+    // and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
+    // relations `table2`\u3001`view2`\u3001`view4` using the default database `db1`, and look up the
+    // relation `view3` using the default database `db2`.
+    //
+    // Note this is compatible with the views defined by older versions of Spark(before 2.2), which
+    // have empty defaultDatabase and all the relations in viewText have database part defined.
+    def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
+      case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
+        val defaultDatabase = AnalysisContext.get.defaultDatabase
+        val relation = lookupTableFromCatalog(u, defaultDatabase)
+        resolveRelation(relation)
+      // The view's child should be a logical plan parsed from the `desc.viewText`, the variable
+      // `viewText` should be defined, or else we throw an error on the generation of the View
+      // operator.
+      case view @ View(desc, _, child) if !child.resolved =>
+        // Resolve all the UnresolvedRelations and Views in the child.
+        val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) {
+          execute(child)
+        }
+        view.copy(child = newChild)
+      case p @ SubqueryAlias(_, view: View, _) =>
+        val newChild = resolveRelation(view)
+        p.copy(child = newChild)
+      case _ => plan
+    }
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
+        i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+      case u: UnresolvedRelation => resolveRelation(u)
+    }
+
+    // Look up the table with the given name from catalog. The database we used is decided by the
+    // precedence:
+    // 1. Use the database part of the table identifier, if it is defined;
+    // 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used,
+    //    and the default database is only used to look up a view);
+    // 3. Use the currentDb of the SessionCatalog.
+    private def lookupTableFromCatalog(
+        u: UnresolvedRelation,
+        defaultDatabase: Option[String] = None): LogicalPlan = {
       try {
-        catalog.lookupRelation(u.tableIdentifier, u.alias)
+        val tableIdentWithDb = u.tableIdentifier.copy(
+          database = u.tableIdentifier.database.orElse(defaultDatabase))
+        catalog.lookupRelation(tableIdentWithDb, u.alias)
       } catch {
         case _: NoSuchTableException =>
           u.failAnalysis(s"Table or view not found: ${u.tableName}")
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
-        i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-      case u: UnresolvedRelation =>
-        val table = u.tableIdentifier
-        if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
-            (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
-          // If the database part is specified, and we support running SQL directly on files, and
-          // it's not a temporary view, and the table does not exist, then let's just return the
-          // original UnresolvedRelation. It is possible we are matching a query like "select *
-          // from parquet.`/path/to/query`". The plan will get resolved later.
-          // Note that we are testing (!db_exists || !table_exists) because the catalog throws
-          // an exception from tableExists if the database does not exist.
-          u
-        } else {
-          lookupTableFromCatalog(u)
-        }
+    // If the database part is specified, and we support running SQL directly on files, and
+    // it's not a temporary view, and the table does not exist, then let's just return the
+    // original UnresolvedRelation. It is possible we are matching a query like "select *
+    // from parquet.`/path/to/query`". The plan will get resolved in the rule `ResolveDataSource`.
+    // Note that we are testing (!db_exists || !table_exists) because the catalog throws
+    // an exception from tableExists if the database does not exist.
+    private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean = {
+      table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
+        (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))
     }
   }
 
@@ -767,19 +857,19 @@ class Analyzer(
     }
   }
 
- /**
-  * In many dialects of SQL it is valid to use ordinal positions in order/sort by and group by
-  * clauses. This rule is to convert ordinal positions to the corresponding expressions in the
-  * select list. This support is introduced in Spark 2.0.
-  *
-  * - When the sort references or group by expressions are not integer but foldable expressions,
-  * just ignore them.
-  * - When spark.sql.orderByOrdinal/spark.sql.groupByOrdinal is set to false, ignore the position
-  * numbers too.
-  *
-  * Before the release of Spark 2.0, the literals in order/sort by and group by clauses
-  * have no effect on the results.
-  */
+  /**
+   * In many dialects of SQL it is valid to use ordinal positions in order/sort by and group by
+   * clauses. This rule is to convert ordinal positions to the corresponding expressions in the
+   * select list. This support is introduced in Spark 2.0.
+   *
+   * - When the sort references or group by expressions are not integer but foldable expressions,
+   * just ignore them.
+   * - When spark.sql.orderByOrdinal/spark.sql.groupByOrdinal is set to false, ignore the position
+   * numbers too.
+   *
+   * Before the release of Spark 2.0, the literals in order/sort by and group by clauses
+   * have no effect on the results.
+   */
   object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
       case p if !p.childrenResolved => p

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
new file mode 100644
index 0000000..737f846
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+ * This file defines analysis rules related to views.
+ */
+
+/**
+ * Make sure that a view's child plan produces the view's output attributes. We wrap the child
+ * with a Project and add an alias for each output attribute. The attributes are resolved by
+ * name. This should be only done after the batch of Resolution, because the view attributes are
+ * not completely resolved during the batch of Resolution.
+ */
+case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case v @ View(_, output, child) if child.resolved =>
+      val resolver = conf.resolver
+      val newOutput = output.map { attr =>
+        val originAttr = findAttributeByName(attr.name, child.output, resolver)
+        // The dataType of the output attributes may be not the same with that of the view output,
+        // so we should cast the attribute to the dataType of the view output attribute. If the
+        // cast can't perform, will throw an AnalysisException.
+        Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
+          qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
+      }
+      v.copy(child = Project(newOutput, child))
+  }
+
+  /**
+   * Find the attribute that has the expected attribute name from an attribute list, the names
+   * are compared using conf.resolver.
+   * If the expected attribute is not found, throw an AnalysisException.
+   */
+  private def findAttributeByName(
+      name: String,
+      attrs: Seq[Attribute],
+      resolver: Resolver): Attribute = {
+    attrs.find { attr =>
+      resolver(attr.name, name)
+    }.getOrElse(throw new AnalysisException(
+      s"Attribute with name '$name' is not found in " +
+        s"'${attrs.map(_.name).mkString("(", ",", ")")}'"))
+  }
+}
+
+/**
+ * Removes [[View]] operators from the plan. The operator is respected till the end of analysis
+ * stage because we want to see which part of an analyzed logical plan is generated from a view.
+ */
+object EliminateView extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // The child should have the same output attributes with the View operator, so we simply
+    // remove the View operator.
+    case View(_, output, child) =>
+      assert(output == child.output, "The output of the child is different from the view output")
+      child
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2060d53..12af9e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.util.StringUtils
 
 object SessionCatalog {
@@ -51,7 +52,8 @@ class SessionCatalog(
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: CatalystConf,
-    hadoopConf: Configuration) extends Logging {
+    hadoopConf: Configuration,
+    parser: ParserInterface) extends Logging {
   import SessionCatalog._
   import CatalogTypes.TablePartitionSpec
 
@@ -66,7 +68,8 @@ class SessionCatalog(
       DummyFunctionResourceLoader,
       functionRegistry,
       conf,
-      new Configuration())
+      new Configuration(),
+      CatalystSqlParser)
   }
 
   // For testing only.
@@ -556,8 +559,11 @@ class SessionCatalog(
    * Note that, the global temp view database is also valid here, this will return the global temp
    * view matching the given name.
    *
-   * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
-   * track the name of the view.
+   * If the relation is a view, we generate a [[View]] operator from the view description, and
+   * wrap the logical plan in a [[SubqueryAlias]] which will track the name of the view.
+   *
+   * @param name The name of the table/view that we look up.
+   * @param alias The alias name of the table/view that we look up.
    */
   def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
     synchronized {
@@ -570,10 +576,19 @@ class SessionCatalog(
         }.getOrElse(throw new NoSuchTableException(db, table))
       } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
-        val view = Option(metadata.tableType).collect {
-          case CatalogTableType.VIEW => name
+        if (metadata.tableType == CatalogTableType.VIEW) {
+          val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
+          // The relation is a view, so we wrap the relation by:
+          // 1. Add a [[View]] operator over the relation to keep track of the view desc;
+          // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
+          val child = View(
+            desc = metadata,
+            output = metadata.schema.toAttributes,
+            child = parser.parsePlan(viewText))
+          SubqueryAlias(relationAlias, child, Option(name))
+        } else {
+          SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
         }
-        SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), view)
       } else {
         SubqueryAlias(relationAlias, tempTables(table), Option(name))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index b402bd2..a9de107 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -196,6 +196,12 @@ case class CatalogTable(
   /** Return the fully qualified name of this table, assuming the database was specified. */
   def qualifiedName: String = identifier.unquotedString
 
+  /**
+   * Return the default database name we use to resolve a view, should be None if the CatalogTable
+   * is not a View.
+   */
+  def viewDefaultDatabase: Option[String] = properties.get(CatalogTable.VIEW_DEFAULT_DATABASE)
+
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
       locationUri: Option[String] = storage.locationUri,
@@ -246,6 +252,9 @@ case class CatalogTable(
   }
 }
 
+object CatalogTable {
+  val VIEW_DEFAULT_DATABASE = "view.default.database"
+}
 
 /**
  * This class of statistics is used in [[CatalogTable]] to interact with metastore.

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index cef17b8..009c517 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -52,6 +52,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
     // we do not eliminate subqueries or compute current time in the analyzer.
     Batch("Finish Analysis", Once,
       EliminateSubqueryAliases,
+      EliminateView,
       ReplaceExpressions,
       ComputeCurrentTime,
       GetCurrentDatabase(sessionCatalog),

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 9bdae5e..48f68a6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTypes}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
@@ -388,6 +389,35 @@ case class InsertIntoTable(
 }
 
 /**
+ * A container for holding the view description(CatalogTable), and the output of the view. The
+ * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
+ * if the `viewText` is not defined.
+ * This operator will be removed at the end of analysis stage.
+ *
+ * @param desc A view description(CatalogTable) that provides necessary information to resolve the
+ *             view.
+ * @param output The output of a view operator, this is generated during planning the view, so that
+ *               we are able to decouple the output from the underlying structure.
+ * @param child The logical plan of a view operator, it should be a logical plan parsed from the
+ *              `CatalogTable.viewText`, should throw an error if the `viewText` is not defined.
+ */
+case class View(
+    desc: CatalogTable,
+    output: Seq[Attribute],
+    child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {
+
+  override lazy val resolved: Boolean = child.resolved
+
+  override def children: Seq[LogicalPlan] = child :: Nil
+
+  override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
+
+  override def simpleString: String = {
+    s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
+  }
+}
+
+/**
  * A container for holding named common table expressions (CTEs) and a query plan.
  * This operator will be removed during analysis and the relations will be substituted into child.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 176cccc..91f464b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -81,13 +81,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
 
   test("list databases without pattern") {
     val catalog = newBasicCatalog()
-    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3"))
   }
 
   test("list databases with pattern") {
     val catalog = newBasicCatalog()
     assert(catalog.listDatabases("db").toSet == Set.empty)
-    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3"))
     assert(catalog.listDatabases("*1").toSet == Set("db1"))
     assert(catalog.listDatabases("db2").toSet == Set("db2"))
   }
@@ -95,7 +95,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
   test("drop database") {
     val catalog = newBasicCatalog()
     catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("default", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db2", "db3"))
   }
 
   test("drop database when the database is not empty") {
@@ -119,7 +119,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     // When cascade is true, it should drop them
     val catalog3 = newBasicCatalog()
     catalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(catalog3.listDatabases().toSet == Set("default", "db1"))
+    assert(catalog3.listDatabases().toSet == Set("default", "db1", "db3"))
   }
 
   test("drop database when the database does not exist") {
@@ -861,6 +861,8 @@ abstract class CatalogTestUtils {
    *     - part1
    *     - part2
    *   - func1
+   * db3
+   *   - view1
    */
   def newBasicCatalog(): ExternalCatalog = {
     val catalog = newEmptyCatalog()
@@ -868,8 +870,10 @@ abstract class CatalogTestUtils {
     catalog.createDatabase(newDb("default"), ignoreIfExists = true)
     catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
     catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+    catalog.createDatabase(newDb("db3"), ignoreIfExists = false)
     catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
     catalog.createTable(newTable("tbl2", "db2"), ignoreIfExists = false)
+    catalog.createTable(newView("view1", Some("db3")), ignoreIfExists = false)
     catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
     catalog.createFunction("db2", newFunc("func1", Some("db2")))
     catalog
@@ -900,6 +904,24 @@ abstract class CatalogTestUtils {
       bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
   }
 
+  def newView(
+      name: String,
+      database: Option[String] = None): CatalogTable = {
+    val viewDefaultDatabase = database.getOrElse("default")
+    CatalogTable(
+      identifier = TableIdentifier(name, database),
+      tableType = CatalogTableType.VIEW,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "string"),
+      viewOriginalText = Some("SELECT * FROM tbl1"),
+      viewText = Some("SELECT * FROM tbl1"),
+      properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> viewDefaultDatabase))
+  }
+
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
     CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[FunctionResource])
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 19b7a46..ae93dff 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,13 +17,13 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
-
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
 
 /**
  * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented.
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
  * signatures but do not extend a common parent. This is largely by design but
  * unfortunately leads to very similar test code in two places.
  */
-class SessionCatalogSuite extends SparkFunSuite {
+class SessionCatalogSuite extends PlanTest {
   private val utils = new CatalogTestUtils {
     override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
     override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
@@ -93,13 +93,13 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list databases without pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3"))
   }
 
   test("list databases with pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
     assert(catalog.listDatabases("db").toSet == Set.empty)
-    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2"))
+    assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3"))
     assert(catalog.listDatabases("*1").toSet == Set("db1"))
     assert(catalog.listDatabases("db2").toSet == Set("db2"))
   }
@@ -107,7 +107,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("drop database") {
     val catalog = new SessionCatalog(newBasicCatalog())
     catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false)
-    assert(catalog.listDatabases().toSet == Set("default", "db2"))
+    assert(catalog.listDatabases().toSet == Set("default", "db2", "db3"))
   }
 
   test("drop database when the database is not empty") {
@@ -132,7 +132,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     val externalCatalog3 = newBasicCatalog()
     val sessionCatalog3 = new SessionCatalog(externalCatalog3)
     externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true)
-    assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1"))
+    assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1", "db3"))
   }
 
   test("drop database when the database does not exist") {
@@ -465,6 +465,23 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
   }
 
+  test("look up view relation") {
+    val externalCatalog = newBasicCatalog()
+    val sessionCatalog = new SessionCatalog(externalCatalog)
+    val metadata = externalCatalog.getTable("db3", "view1")
+    sessionCatalog.setCurrentDatabase("default")
+    // Look up a view.
+    assert(metadata.viewText.isDefined)
+    val view = View(desc = metadata, output = metadata.schema.toAttributes,
+      child = CatalystSqlParser.parsePlan(metadata.viewText.get))
+    comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1", Some("db3"))),
+      SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
+    // Look up a view using current database of the session catalog.
+    sessionCatalog.setCurrentDatabase("db3")
+    comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
+      SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
+  }
+
   test("table exists") {
     val catalog = new SessionCatalog(newBasicCatalog())
     assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
@@ -1140,5 +1157,4 @@ class SessionCatalogSuite extends SparkFunSuite {
       catalog.listFunctions("unknown_db", "func*")
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index 3804542..0384c0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -219,6 +219,9 @@ class SQLBuilder private (
     case OneRowRelation =>
       ""
 
+    case p: View =>
+      toSQL(p.child)
+
     case _ =>
       throw new UnsupportedOperationException(s"unsupported plan $node")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c9075ce..64ec62f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -99,7 +99,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
     functionResourceLoader,
     functionRegistry,
     conf,
-    newHadoopConf())
+    newHadoopConf(),
+    sqlParser)
 
   /**
    * Interface exposed to the user for registering user-defined functions.

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 563d068..605dec4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2499,4 +2499,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("should be able to resolve a persistent view") {
+    withTable("t1") {
+      withView("v1") {
+        sql("CREATE TABLE `t1` USING parquet AS SELECT * FROM VALUES(1, 1) AS t1(a, b)")
+        sql("CREATE VIEW `v1` AS SELECT * FROM t1")
+        checkAnswer(spark.table("v1"), Row(1, 1))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 474a2c8..208c8c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -606,8 +606,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   /**
-   * Restores table metadata from the table properties if it's a datasouce table. This method is
-   * kind of a opposite version of [[createTable]].
+   * Restores table metadata from the table properties. This method is kind of a opposite version
+   * of [[createTable]].
    *
    * It reads table schema, provider, partition column names and bucket specification from table
    * properties, and filter out these special entries from table properties.

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ee4589f..0c110d3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -111,6 +111,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     new Path(new Path(dbLocation), tblName).toString
   }
 
+  /**
+   * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore.
+   *
+   * @param tableIdent The name of the table/view that we look up.
+   * @param alias The alias name of the table/view that we look up.
+   * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore.
+   */
   def lookupRelation(
       tableIdent: TableIdentifier,
       alias: Option[String]): LogicalPlan = {
@@ -125,11 +132,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       // Otherwise, wrap the table with a Subquery using the table name.
       alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
     } else if (table.tableType == CatalogTableType.VIEW) {
+      val tableIdentifier = table.identifier
       val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
-      SubqueryAlias(
-        alias.getOrElse(table.identifier.table),
-        sparkSession.sessionState.sqlParser.parsePlan(viewText),
-        Option(table.identifier))
+      // The relation is a view, so we wrap the relation by:
+      // 1. Add a [[View]] operator over the relation to keep track of the view desc;
+      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
+      val child = View(
+        desc = table,
+        output = table.schema.toAttributes,
+        child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
+      SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier))
     } else {
       val qualifiedTable =
         MetastoreRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 462b3c2..b3cbbed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableExce
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
@@ -46,16 +47,18 @@ private[sql] class HiveSessionCatalog(
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: SQLConf,
-    hadoopConf: Configuration)
+    hadoopConf: Configuration,
+    parser: ParserInterface)
   extends SessionCatalog(
     externalCatalog,
     globalTempViewManager,
     functionResourceLoader,
     functionRegistry,
     conf,
-    hadoopConf) {
+    hadoopConf,
+    parser) {
 
-  override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
+  override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
     synchronized {
       val table = formatTableName(name.table)
       val db = formatDatabaseName(name.database.getOrElse(currentDb))
@@ -65,8 +68,7 @@ private[sql] class HiveSessionCatalog(
           SubqueryAlias(relationAlias, viewDef, Some(name))
         }.getOrElse(throw new NoSuchTableException(db, table))
       } else if (name.database.isDefined || !tempTables.contains(table)) {
-        val database = name.database.map(formatDatabaseName)
-        val newName = name.copy(database = database, table = table)
+        val newName = name.copy(database = Some(db), table = table)
         metastoreCatalog.lookupRelation(newName, alias)
       } else {
         val relation = tempTables(table)

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index aebee85..9b4b8b6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -50,7 +50,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       functionResourceLoader,
       functionRegistry,
       conf,
-      newHadoopConf())
+      newHadoopConf(),
+      sqlParser)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/30a07071/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index ba65db7..e06d0ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * A suite for testing view related functionality.
@@ -543,4 +545,157 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       }
     }
   }
+
+  test("correctly resolve a nested view") {
+    withTempDatabase { db =>
+      withView(s"$db.view1", s"$db.view2") {
+        val view1 = CatalogTable(
+          identifier = TableIdentifier("view1", Some(db)),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = new StructType().add("id", "int").add("id1", "int"),
+          viewOriginalText = Some("SELECT * FROM jt"),
+          viewText = Some("SELECT * FROM jt"),
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        val view2 = CatalogTable(
+          identifier = TableIdentifier("view2", Some(db)),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = new StructType().add("id", "int").add("id1", "int"),
+          viewOriginalText = Some("SELECT * FROM view1"),
+          viewText = Some("SELECT * FROM view1"),
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db))
+        activateDatabase(db) {
+          hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+          hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+          checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i)))
+        }
+      }
+    }
+  }
+
+  test("correctly resolve a view with CTE") {
+    withView("cte_view") {
+      val cte_view = CatalogTable(
+        identifier = TableIdentifier("cte_view"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("n", "int"),
+        viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+        viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+      hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false)
+      checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
+    }
+  }
+
+  test("correctly resolve a view in a self join") {
+    withView("join_view") {
+      val join_view = CatalogTable(
+        identifier = TableIdentifier("join_view"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("id", "int").add("id1", "int"),
+        viewOriginalText = Some("SELECT * FROM jt"),
+        viewText = Some("SELECT * FROM jt"),
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+      hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false)
+      checkAnswer(
+        sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"),
+        (1 to 9).map(i => Row(i, i, i, i)))
+    }
+  }
+
+  private def assertInvalidReference(query: String): Unit = {
+    val e = intercept[AnalysisException] {
+      sql(query)
+    }.getMessage
+    assert(e.contains("Table or view not found"))
+  }
+
+  test("error handling: fail if the referenced table or view is invalid") {
+    withView("view1", "view2", "view3") {
+      // Fail if the referenced table is defined in a invalid database.
+      val view1 = CatalogTable(
+        identifier = TableIdentifier("view1"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("id", "int").add("id1", "int"),
+        viewOriginalText = Some("SELECT * FROM invalid_db.jt"),
+        viewText = Some("SELECT * FROM invalid_db.jt"),
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+      hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+      assertInvalidReference("SELECT * FROM view1")
+
+      // Fail if the referenced table is invalid.
+      val view2 = CatalogTable(
+        identifier = TableIdentifier("view2"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("id", "int").add("id1", "int"),
+        viewOriginalText = Some("SELECT * FROM invalid_table"),
+        viewText = Some("SELECT * FROM invalid_table"),
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+      hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+      assertInvalidReference("SELECT * FROM view2")
+
+      // Fail if the referenced view is invalid.
+      val view3 = CatalogTable(
+        identifier = TableIdentifier("view3"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("id", "int").add("id1", "int"),
+        viewOriginalText = Some("SELECT * FROM view2"),
+        viewText = Some("SELECT * FROM view2"),
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+      hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false)
+      assertInvalidReference("SELECT * FROM view3")
+    }
+  }
+
+  test("make sure we can resolve view created by old version of Spark") {
+    withTable("hive_table") {
+      withView("old_view") {
+        spark.sql("CREATE TABLE hive_table AS SELECT 1 AS a, 2 AS b")
+        // The views defined by older versions of Spark(before 2.2) will have empty view default
+        // database name, and all the relations referenced in the viewText will have database part
+        // defined.
+        val view = CatalogTable(
+          identifier = TableIdentifier("old_view"),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = new StructType().add("a", "int").add("b", "int"),
+          viewOriginalText = Some(s"SELECT * FROM hive_table"),
+          viewText = Some("SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b` FROM (SELECT " +
+            "`gen_attr_0`, `gen_attr_1` FROM (SELECT `a` AS `gen_attr_0`, `b` AS " +
+            "`gen_attr_1` FROM hive_table) AS gen_subquery_0) AS hive_table")
+        )
+        hiveContext.sessionState.catalog.createTable(view, ignoreIfExists = false)
+        val df = sql("SELECT * FROM old_view")
+        // Check the output rows.
+        checkAnswer(df, Row(1, 2))
+        // Check the output schema.
+        assert(df.schema.sameType(view.schema))
+      }
+    }
+  }
+
+  test("correctly handle type casting between view output and child output") {
+    withTable("testTable") {
+      withView("testView") {
+        spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("testTable")
+        sql("CREATE VIEW testView AS SELECT * FROM testTable")
+
+        // Allow casting from IntegerType to LongType
+        val df = (1 until 10).map(i => i).toDF("id1")
+        df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i)))
+
+        // Can't cast from ArrayType to LongType, throw an AnalysisException.
+        val df2 = (1 until 10).map(i => Seq(i)).toDF("id1")
+        df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+        intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY id1"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org