You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/10/08 21:42:34 UTC

spark git commit: [SPARK-10337] [SQL] fix hive views on non-hive-compatible tables.

Repository: spark
Updated Branches:
  refs/heads/master 82d275f27 -> af2a55448


[SPARK-10337] [SQL] fix hive views on non-hive-compatible tables.

add a new config to deal with this special case.

Author: Wenchen Fan <cl...@163.com>

Closes #8990 from cloud-fan/view-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af2a5544
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af2a5544
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af2a5544

Branch: refs/heads/master
Commit: af2a5544875b23b3b62fb6d4f3bf432828720008
Parents: 82d275f
Author: Wenchen Fan <cl...@163.com>
Authored: Thu Oct 8 12:42:10 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Oct 8 12:42:10 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  15 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  23 +++
 .../org/apache/spark/sql/hive/HiveQl.scala      | 164 ++++++++++++++++---
 .../spark/sql/hive/client/ClientInterface.scala |  13 +-
 .../spark/sql/hive/client/ClientWrapper.scala   |  31 ++++
 .../sql/hive/execution/CreateViewAsSelect.scala |  97 +++++++++++
 .../sql/hive/execution/SQLQuerySuite.scala      | 117 +++++++++++++
 7 files changed, 433 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index e7bbc7d..8f0f891 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -319,6 +319,15 @@ private[spark] object SQLConf {
     doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
           "unmatching partitions can be eliminated earlier.")
 
+  val CANONICALIZE_VIEW = booleanConf("spark.sql.canonicalizeView",
+    defaultValue = Some(false),
+    doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands.  " +
+          "Note that this function is experimental and should ony be used when you are using " +
+          "non-hive-compatible tables written by Spark SQL.  The SQL string used to create " +
+          "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+          "possible, or you may get wrong result.",
+    isPublic = false)
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
     defaultValue = Some("_corrupt_record"),
     doc = "<TODO>")
@@ -362,7 +371,7 @@ private[spark] object SQLConf {
 
   val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
     defaultValue = Some(true),
-    doc = "When true, automtically discover data partitions.")
+    doc = "When true, automatically discover data partitions.")
 
   val PARTITION_COLUMN_TYPE_INFERENCE =
     booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
@@ -372,7 +381,7 @@ private[spark] object SQLConf {
   val PARTITION_MAX_FILES =
     intConf("spark.sql.sources.maxConcurrentWrites",
       defaultValue = Some(5),
-      doc = "The maximum number of concurent files to open before falling back on sorting when " +
+      doc = "The maximum number of concurrent files to open before falling back on sorting when " +
             "writing out files using dynamic partitioning.")
 
   // The output committer class used by HadoopFsRelation. The specified class needs to be a
@@ -471,6 +480,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
+  private[spark] def canonicalizeView: Boolean = getConf(CANONICALIZE_VIEW)
+
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
   private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ea1521a..cf59bc0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.execution.{FileRelation, datasources}
 import org.apache.spark.sql.hive.client._
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
@@ -588,6 +589,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
+
+      case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+        if (conf.canonicalizeView) {
+          if (allowExisting && replace) {
+            throw new AnalysisException(
+              "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+          }
+
+          val (dbName, tblName) = processDatabaseAndTableName(
+            table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+
+          execution.CreateViewAsSelect(
+            table.copy(
+              specifiedDatabase = Some(dbName),
+              name = tblName),
+            child.output,
+            allowExisting,
+            replace)
+        } else {
+          HiveNativeCommand(sql)
+        }
+
       case p @ CreateTableAsSelect(table, child, allowExisting) =>
         val schema = if (table.schema.nonEmpty) {
           table.schema

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 256440a..2bf22f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -77,6 +77,16 @@ private[hive] case class CreateTableAsSelect(
     childrenResolved
 }
 
+private[hive] case class CreateViewAsSelect(
+    tableDesc: HiveTable,
+    child: LogicalPlan,
+    allowExisting: Boolean,
+    replace: Boolean,
+    sql: String) extends UnaryNode with Command {
+  override def output: Seq[Attribute] = Seq.empty[Attribute]
+  override lazy val resolved: Boolean = false
+}
+
 /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
 private[hive] object HiveQl extends Logging {
   protected val nativeCommands = Seq(
@@ -99,7 +109,6 @@ private[hive] object HiveQl extends Logging {
     "TOK_ALTERTABLE_SKEWED",
     "TOK_ALTERTABLE_TOUCH",
     "TOK_ALTERTABLE_UNARCHIVE",
-    "TOK_ALTERVIEW",
     "TOK_ALTERVIEW_ADDPARTS",
     "TOK_ALTERVIEW_AS",
     "TOK_ALTERVIEW_DROPPARTS",
@@ -110,7 +119,6 @@ private[hive] object HiveQl extends Logging {
     "TOK_CREATEFUNCTION",
     "TOK_CREATEINDEX",
     "TOK_CREATEROLE",
-    "TOK_CREATEVIEW",
 
     "TOK_DESCDATABASE",
     "TOK_DESCFUNCTION",
@@ -254,12 +262,17 @@ private[hive] object HiveQl extends Logging {
      * Otherwise, there will be Null pointer exception,
      * when retrieving properties form HiveConf.
      */
-    val hContext = new Context(SessionState.get().getConf())
-    val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
+    val hContext = createContext()
+    val node = getAst(sql, hContext)
     hContext.clear()
     node
   }
 
+  private def createContext(): Context = new Context(SessionState.get().getConf())
+
+  private def getAst(sql: String, context: Context) =
+    ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, context))
+
   /**
    * Returns the HiveConf
    */
@@ -280,15 +293,18 @@ private[hive] object HiveQl extends Logging {
   /** Creates LogicalPlan for a given HiveQL string. */
   def createPlan(sql: String): LogicalPlan = {
     try {
-      val tree = getAst(sql)
-      if (nativeCommands contains tree.getText) {
+      val context = createContext()
+      val tree = getAst(sql, context)
+      val plan = if (nativeCommands contains tree.getText) {
         HiveNativeCommand(sql)
       } else {
-        nodeToPlan(tree) match {
+        nodeToPlan(tree, context) match {
           case NativePlaceholder => HiveNativeCommand(sql)
           case other => other
         }
       }
+      context.clear()
+      plan
     } catch {
       case pe: org.apache.hadoop.hive.ql.parse.ParseException =>
         pe.getMessage match {
@@ -342,7 +358,9 @@ private[hive] object HiveQl extends Logging {
     }
   }
 
-  protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
+  protected def getClauses(
+      clauseNames: Seq[String],
+      nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = {
     var remainingNodes = nodeList
     val clauses = clauseNames.map { clauseName =>
       val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
@@ -489,7 +507,43 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       }
   }
 
-  protected def nodeToPlan(node: Node): LogicalPlan = node match {
+  private def createView(
+      view: ASTNode,
+      context: Context,
+      viewNameParts: ASTNode,
+      query: ASTNode,
+      schema: Seq[HiveColumn],
+      properties: Map[String, String],
+      allowExist: Boolean,
+      replace: Boolean): CreateViewAsSelect = {
+    val (db, viewName) = extractDbNameTableName(viewNameParts)
+
+    val originalText = context.getTokenRewriteStream
+      .toString(query.getTokenStartIndex, query.getTokenStopIndex)
+
+    val tableDesc = HiveTable(
+      specifiedDatabase = db,
+      name = viewName,
+      schema = schema,
+      partitionColumns = Seq.empty[HiveColumn],
+      properties = properties,
+      serdeProperties = Map[String, String](),
+      tableType = VirtualView,
+      location = None,
+      inputFormat = None,
+      outputFormat = None,
+      serde = None,
+      viewText = Some(originalText))
+
+    // We need to keep the original SQL string so that if `spark.sql.canonicalizeView` is
+    // false, we can fall back to use hive native command later.
+    // We can remove this when parser is configurable(can access SQLConf) in the future.
+    val sql = context.getTokenRewriteStream
+      .toString(view.getTokenStartIndex, view.getTokenStopIndex)
+    CreateViewAsSelect(tableDesc, nodeToPlan(query, context), allowExist, replace, sql)
+  }
+
+  protected def nodeToPlan(node: ASTNode, context: Context): LogicalPlan = node match {
     // Special drop table that also uncaches.
     case Token("TOK_DROPTABLE",
            Token("TOK_TABNAME", tableNameParts) ::
@@ -521,14 +575,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       val Some(crtTbl) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
-        nodeToPlan(crtTbl),
+        nodeToPlan(crtTbl, context),
         extended = extended.isDefined)
     case Token("TOK_EXPLAIN", explainArgs) =>
       // Ignore FORMATTED if present.
       val Some(query) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
-        nodeToPlan(query),
+        nodeToPlan(query, context),
         extended = extended.isDefined)
 
     case Token("TOK_DESCTABLE", describeArgs) =>
@@ -563,6 +617,73 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         }
       }
 
+    case view @ Token("TOK_ALTERVIEW", children) =>
+      val Some(viewNameParts) :: maybeQuery :: ignores =
+        getClauses(Seq(
+          "TOK_TABNAME",
+          "TOK_QUERY",
+          "TOK_ALTERVIEW_ADDPARTS",
+          "TOK_ALTERVIEW_DROPPARTS",
+          "TOK_ALTERVIEW_PROPERTIES",
+          "TOK_ALTERVIEW_RENAME"), children)
+
+      // if ALTER VIEW doesn't have query part, let hive to handle it.
+      maybeQuery.map { query =>
+        createView(view, context, viewNameParts, query, Nil, Map(), false, true)
+      }.getOrElse(NativePlaceholder)
+
+    case view @ Token("TOK_CREATEVIEW", children)
+        if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
+      val Seq(
+        Some(viewNameParts),
+        Some(query),
+        maybeComment,
+        replace,
+        allowExisting,
+        maybeProperties,
+        maybeColumns,
+        maybePartCols
+      ) = getClauses(Seq(
+        "TOK_TABNAME",
+        "TOK_QUERY",
+        "TOK_TABLECOMMENT",
+        "TOK_ORREPLACE",
+        "TOK_IFNOTEXISTS",
+        "TOK_TABLEPROPERTIES",
+        "TOK_TABCOLNAME",
+        "TOK_VIEWPARTCOLS"), children)
+
+      // If the view is partitioned, we let hive handle it.
+      if (maybePartCols.isDefined) {
+        NativePlaceholder
+      } else {
+        val schema = maybeColumns.map { cols =>
+          BaseSemanticAnalyzer.getColumns(cols, true).asScala.map { field =>
+            // We can't specify column types when create view, so fill it with null first, and
+            // update it after the schema has been resolved later.
+            HiveColumn(field.getName, null, field.getComment)
+          }
+        }.getOrElse(Seq.empty[HiveColumn])
+
+        val properties = scala.collection.mutable.Map.empty[String, String]
+
+        maybeProperties.foreach {
+          case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+            properties ++= getProperties(list)
+        }
+
+        maybeComment.foreach {
+          case Token("TOK_TABLECOMMENT", child :: Nil) =>
+            val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+            if (comment ne null) {
+              properties += ("comment" -> comment)
+            }
+        }
+
+        createView(view, context, viewNameParts, query, schema, properties.toMap,
+          allowExisting.isDefined, replace.isDefined)
+      }
+
     case Token("TOK_CREATETABLE", children)
         if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
       // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -774,7 +895,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         case _ => // Unsupport features
       }
 
-      CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
+      CreateTableAsSelect(tableDesc, nodeToPlan(query, context), allowExisting != None)
 
     // If its not a "CTAS" like above then take it as a native command
     case Token("TOK_CREATETABLE", _) => NativePlaceholder
@@ -793,7 +914,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
             insertClauses.last match {
               case Token("TOK_CTE", cteClauses) =>
                 val cteRelations = cteClauses.map(node => {
-                  val relation = nodeToRelation(node).asInstanceOf[Subquery]
+                  val relation = nodeToRelation(node, context).asInstanceOf[Subquery]
                   (relation.alias, relation)
                 }).toMap
                 (Some(args.head), insertClauses.init, Some(cteRelations))
@@ -847,7 +968,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         }
 
         val relations = fromClause match {
-          case Some(f) => nodeToRelation(f)
+          case Some(f) => nodeToRelation(f, context)
           case None => OneRowRelation
         }
 
@@ -1094,7 +1215,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       cteRelations.map(With(query, _)).getOrElse(query)
 
     // HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT
-    case Token("TOK_UNIONALL", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
+    case Token("TOK_UNIONALL", left :: right :: Nil) =>
+      Union(nodeToPlan(left, context), nodeToPlan(right, context))
 
     case a: ASTNode =>
       throw new NotImplementedError(s"No parse rules for $node:\n ${dumpTree(a).toString} ")
@@ -1102,10 +1224,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
 
   val allJoinTokens = "(TOK_.*JOIN)".r
   val laterViewToken = "TOK_LATERAL_VIEW(.*)".r
-  def nodeToRelation(node: Node): LogicalPlan = node match {
+  def nodeToRelation(node: Node, context: Context): LogicalPlan = node match {
     case Token("TOK_SUBQUERY",
            query :: Token(alias, Nil) :: Nil) =>
-      Subquery(cleanIdentifier(alias), nodeToPlan(query))
+      Subquery(cleanIdentifier(alias), nodeToPlan(query, context))
 
     case Token(laterViewToken(isOuter), selectClause :: relationClause :: Nil) =>
       val Token("TOK_SELECT",
@@ -1121,7 +1243,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           outer = isOuter.nonEmpty,
           Some(alias.toLowerCase),
           attributes.map(UnresolvedAttribute(_)),
-          nodeToRelation(relationClause))
+          nodeToRelation(relationClause, context))
 
     /* All relations, possibly with aliases or sampling clauses. */
     case Token("TOK_TABREF", clauses) =>
@@ -1189,7 +1311,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         }.map(_._2)
 
       val isPreserved = tableOrdinals.map(i => (i - 1 < 0) || joinArgs(i - 1).getText == "PRESERVE")
-      val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i)))
+      val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i), context))
       val joinExpressions =
         tableOrdinals.map(i => joinArgs(i + 1).getChildren.asScala.map(nodeToExpr))
 
@@ -1244,8 +1366,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         case "TOK_FULLOUTERJOIN" => FullOuter
         case "TOK_LEFTSEMIJOIN" => LeftSemi
       }
-      Join(nodeToRelation(relation1),
-        nodeToRelation(relation2),
+      Join(nodeToRelation(relation1, context),
+        nodeToRelation(relation2, context),
         joinType,
         other.headOption.map(nodeToExpr))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 3811c15..915eae9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -19,13 +19,12 @@ package org.apache.spark.sql.hive.client
 
 import java.io.PrintStream
 import java.util.{Map => JMap}
+import javax.annotation.Nullable
 
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
 
-private[hive] case class HiveDatabase(
-    name: String,
-    location: String)
+private[hive] case class HiveDatabase(name: String, location: String)
 
 private[hive] abstract class TableType { val name: String }
 private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
@@ -45,7 +44,7 @@ private[hive] case class HivePartition(
     values: Seq[String],
     storage: HiveStorageDescriptor)
 
-private[hive] case class HiveColumn(name: String, hiveType: String, comment: String)
+private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
 private[hive] case class HiveTable(
     specifiedDatabase: Option[String],
     name: String,
@@ -126,6 +125,12 @@ private[hive] trait ClientInterface {
   /** Returns the metadata for the specified table or None if it doens't exist. */
   def getTableOption(dbName: String, tableName: String): Option[HiveTable]
 
+  /** Creates a view with the given metadata. */
+  def createView(view: HiveTable): Unit
+
+  /** Updates the given view with new metadata. */
+  def alertView(view: HiveTable): Unit
+
   /** Creates a table with the given metadata. */
   def createTable(table: HiveTable): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 4d1e3ed..8f6d448 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -354,6 +354,37 @@ private[hive] class ClientWrapper(
     qlTable
   }
 
+  private def toViewTable(view: HiveTable): metadata.Table = {
+    // TODO: this is duplicated with `toQlTable` except the table type stuff.
+    val tbl = new metadata.Table(view.database, view.name)
+    tbl.setTableType(HTableType.VIRTUAL_VIEW)
+    tbl.setSerializationLib(null)
+    tbl.clearSerDeInfo()
+
+    // TODO: we will save the same SQL string to original and expanded text, which is different
+    // from Hive.
+    tbl.setViewOriginalText(view.viewText.get)
+    tbl.setViewExpandedText(view.viewText.get)
+
+    tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+    view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
+
+    // set owner
+    tbl.setOwner(conf.getUser)
+    // set create time
+    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+    tbl
+  }
+
+  override def createView(view: HiveTable): Unit = withHiveState {
+    client.createTable(toViewTable(view))
+  }
+
+  override def alertView(view: HiveTable): Unit = withHiveState {
+    client.alterTable(view.qualifiedName, toViewTable(view))
+  }
+
   override def createTable(table: HiveTable): Unit = withHiveState {
     val qlTable = toQlTable(table)
     client.createTable(qlTable)

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
new file mode 100644
index 0000000..2b504ac
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ */
+// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
+// from Hive and may not work for some cases like create view on self join.
+private[hive] case class CreateViewAsSelect(
+    tableDesc: HiveTable,
+    childSchema: Seq[Attribute],
+    allowExisting: Boolean,
+    orReplace: Boolean) extends RunnableCommand {
+
+  assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
+  assert(tableDesc.viewText.isDefined)
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    val database = tableDesc.database
+    val viewName = tableDesc.name
+
+    if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+      if (allowExisting) {
+        // view already exists, will do nothing, to keep consistent with Hive
+      } else if (orReplace) {
+        hiveContext.catalog.client.alertView(prepareTable())
+      } else {
+        throw new AnalysisException(s"View $database.$viewName already exists. " +
+          "If you want to update the view definition, please use ALTER VIEW AS or " +
+          "CREATE OR REPLACE VIEW AS")
+      }
+    } else {
+      hiveContext.catalog.client.createView(prepareTable())
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def prepareTable(): HiveTable = {
+    // setup column types according to the schema of child.
+    val schema = if (tableDesc.schema == Nil) {
+      childSchema.map { attr =>
+        HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+      }
+    } else {
+      childSchema.zip(tableDesc.schema).map { case (attr, col) =>
+        HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+      }
+    }
+
+    val columnNames = childSchema.map(f => verbose(f.name))
+
+    // When user specified column names for view, we should create a project to do the renaming.
+    // When no column name specified, we still need to create a project to declare the columns
+    // we need, to make us more robust to top level `*`s.
+    val projectList = if (tableDesc.schema == Nil) {
+      columnNames.mkString(", ")
+    } else {
+      columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
+        case (name, alias) => s"$name AS $alias"
+      }.mkString(", ")
+    }
+
+    val viewName = verbose(tableDesc.name)
+
+    val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
+
+    tableDesc.copy(schema = schema, viewText = Some(expandedText))
+  }
+
+  // escape backtick with double-backtick in column name and wrap it with backtick.
+  private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/af2a5544/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8c3f9ac..ec5b83b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1248,4 +1248,121 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         """.stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil)
     }
   }
+
+  test("correctly parse CREATE VIEW statement") {
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt") {
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt")
+        sql(
+          """CREATE VIEW IF NOT EXISTS
+            |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+            |COMMENT 'blabla'
+            |TBLPROPERTIES ('a' = 'b')
+            |AS SELECT * FROM jt""".stripMargin)
+        checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("correctly handle CREATE VIEW IF NOT EXISTS") {
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt", "jt2") {
+        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt2")
+        sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+
+        // make sure our view doesn't change.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("correctly handle CREATE OR REPLACE VIEW") {
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt", "jt2") {
+        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt2")
+        sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+        // make sure the view has been changed.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+        sql("DROP VIEW testView")
+
+        val e = intercept[AnalysisException] {
+          sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+        }
+        assert(e.message.contains("not allowed to define a view"))
+      }
+    }
+  }
+
+  test("correctly handle ALTER VIEW") {
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt", "jt2") {
+        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+        df.write.format("json").saveAsTable("jt2")
+        sql("ALTER VIEW testView AS SELECT * FROM jt2")
+        // make sure the view has been changed.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("create hive view for json table") {
+    // json table is not hive-compatible, make sure the new flag fix it.
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt") {
+        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        sql("CREATE VIEW testView AS SELECT id FROM jt")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("create hive view for partitioned parquet table") {
+    // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("parTable") {
+        val df = Seq(1 -> "a").toDF("i", "j")
+        df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+        sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+        checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+        sql("DROP VIEW testView")
+      }
+    }
+  }
+
+  test("create hive view for joined tables") {
+    // make sure the new flag can handle some complex cases like join and schema change.
+    withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+      withTable("jt1", "jt2") {
+        sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
+        sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
+        sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+        val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
+        df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+        sql("DROP VIEW testView")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org