You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/06 20:16:57 UTC
[2/8] spark git commit: [SPARK-12573][SPARK-12574][SQL] Move SQL
Parser from Hive to Catalyst
http://git-wip-us.apache.org/repos/asf/spark/blob/ea489f14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 31d82eb..bf3fe12 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -17,41 +17,30 @@
package org.apache.spark.sql.hive
-import java.sql.Date
import java.util.Locale
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
-import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
-import org.apache.hadoop.hive.ql.lib.Node
-import org.apache.hadoop.hive.ql.parse.SemanticException
-import org.apache.hadoop.hive.ql.plan.PlanUtils
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
+import org.apache.hadoop.hive.ql.parse.EximUtil
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-
import org.apache.spark.Logging
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.parser._
+import org.apache.spark.sql.catalyst.parser.ParseUtils._
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.execution.datasources.DescribeCommand
-import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.execution.SparkQl
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.hive.execution.{AnalyzeTable, DropTable, HiveNativeCommand, HiveScriptIOSchema}
-import org.apache.spark.sql.parser._
+import org.apache.spark.sql.hive.execution.{HiveNativeCommand, AnalyzeTable, DropTable, HiveScriptIOSchema}
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.CalendarInterval
-import org.apache.spark.util.random.RandomSampler
+import org.apache.spark.sql.AnalysisException
/**
* Used when we need to start parsing the AST before deciding that we are going to pass the command
@@ -71,7 +60,7 @@ private[hive] case class CreateTableAsSelect(
override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined &&
- tableDesc.schema.size > 0 &&
+ tableDesc.schema.nonEmpty &&
tableDesc.serde.isDefined &&
tableDesc.inputFormat.isDefined &&
tableDesc.outputFormat.isDefined &&
@@ -89,7 +78,7 @@ private[hive] case class CreateViewAsSelect(
}
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
-private[hive] object HiveQl extends Logging {
+private[hive] object HiveQl extends SparkQl with Logging {
protected val nativeCommands = Seq(
"TOK_ALTERDATABASE_OWNER",
"TOK_ALTERDATABASE_PROPERTIES",
@@ -181,103 +170,6 @@ private[hive] object HiveQl extends Logging {
protected val hqlParser = new ExtendedHiveQlParser
/**
- * A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations
- * similar to [[catalyst.trees.TreeNode]].
- *
- * Note that this should be considered very experimental and is not indented as a replacement
- * for TreeNode. Primarily it should be noted ASTNodes are not immutable and do not appear to
- * have clean copy semantics. Therefore, users of this class should take care when
- * copying/modifying trees that might be used elsewhere.
- */
- implicit class TransformableNode(n: ASTNode) {
- /**
- * Returns a copy of this node where `rule` has been recursively applied to it and all of its
- * children. When `rule` does not apply to a given node it is left unchanged.
- * @param rule the function use to transform this nodes children
- */
- def transform(rule: PartialFunction[ASTNode, ASTNode]): ASTNode = {
- try {
- val afterRule = rule.applyOrElse(n, identity[ASTNode])
- afterRule.withChildren(
- nilIfEmpty(afterRule.getChildren)
- .asInstanceOf[Seq[ASTNode]]
- .map(ast => Option(ast).map(_.transform(rule)).orNull))
- } catch {
- case e: Exception =>
- logError(dumpTree(n).toString)
- throw e
- }
- }
-
- /**
- * Returns a scala.Seq equivalent to [s] or Nil if [s] is null.
- */
- private def nilIfEmpty[A](s: java.util.List[A]): Seq[A] =
- Option(s).map(_.asScala).getOrElse(Nil)
-
- /**
- * Returns this ASTNode with the text changed to `newText`.
- */
- def withText(newText: String): ASTNode = {
- n.token.asInstanceOf[org.antlr.runtime.CommonToken].setText(newText)
- n
- }
-
- /**
- * Returns this ASTNode with the children changed to `newChildren`.
- */
- def withChildren(newChildren: Seq[ASTNode]): ASTNode = {
- (1 to n.getChildCount).foreach(_ => n.deleteChild(0))
- newChildren.foreach(n.addChild(_))
- n
- }
-
- /**
- * Throws an error if this is not equal to other.
- *
- * Right now this function only checks the name, type, text and children of the node
- * for equality.
- */
- def checkEquals(other: ASTNode): Unit = {
- def check(field: String, f: ASTNode => Any): Unit = if (f(n) != f(other)) {
- sys.error(s"$field does not match for trees. " +
- s"'${f(n)}' != '${f(other)}' left: ${dumpTree(n)}, right: ${dumpTree(other)}")
- }
- check("name", _.getName)
- check("type", _.getType)
- check("text", _.getText)
- check("numChildren", n => nilIfEmpty(n.getChildren).size)
-
- val leftChildren = nilIfEmpty(n.getChildren).asInstanceOf[Seq[ASTNode]]
- val rightChildren = nilIfEmpty(other.getChildren).asInstanceOf[Seq[ASTNode]]
- leftChildren zip rightChildren foreach {
- case (l, r) => l checkEquals r
- }
- }
- }
-
- /**
- * Returns the AST for the given SQL string.
- */
- def getAst(sql: String): ASTNode = {
- /*
- * Context has to be passed in hive0.13.1.
- * Otherwise, there will be Null pointer exception,
- * when retrieving properties form HiveConf.
- */
- val hContext = createContext()
- val node = getAst(sql, hContext)
- hContext.clear()
- node
- }
-
- private def createContext(): Context = new Context(hiveConf)
-
- private def getAst(sql: String, context: Context) =
- ParseUtils.findRootNonNullToken(
- (new ParseDriver).parse(sql, context))
-
- /**
* Returns the HiveConf
*/
private[this] def hiveConf: HiveConf = {
@@ -296,226 +188,16 @@ private[hive] object HiveQl extends Logging {
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
- val errorRegEx = "line (\\d+):(\\d+) (.*)".r
-
- /** Creates LogicalPlan for a given HiveQL string. */
- def createPlan(sql: String): LogicalPlan = {
- try {
- val context = createContext()
- val tree = getAst(sql, context)
- val plan = if (nativeCommands contains tree.getText) {
- HiveNativeCommand(sql)
- } else {
- nodeToPlan(tree, context) match {
- case NativePlaceholder => HiveNativeCommand(sql)
- case other => other
- }
- }
- context.clear()
- plan
- } catch {
- case pe: ParseException =>
- pe.getMessage match {
- case errorRegEx(line, start, message) =>
- throw new AnalysisException(message, Some(line.toInt), Some(start.toInt))
- case otherMessage =>
- throw new AnalysisException(otherMessage)
- }
- case e: MatchError => throw e
- case e: Exception =>
- throw new AnalysisException(e.getMessage)
- case e: NotImplementedError =>
- throw new AnalysisException(
- s"""
- |Unsupported language features in query: $sql
- |${dumpTree(getAst(sql))}
- |$e
- |${e.getStackTrace.head}
- """.stripMargin)
- }
- }
-
- def parseDdl(ddl: String): Seq[Attribute] = {
- val tree =
- try {
- ParseUtils.findRootNonNullToken(
- (new ParseDriver)
- .parse(ddl, null /* no context required for parsing alone */))
- } catch {
- case pe: org.apache.hadoop.hive.ql.parse.ParseException =>
- throw new RuntimeException(s"Failed to parse ddl: '$ddl'", pe)
- }
- assert(tree.asInstanceOf[ASTNode].getText == "TOK_CREATETABLE", "Only CREATE TABLE supported.")
- val tableOps = tree.getChildren
- val colList =
- tableOps.asScala
- .find(_.asInstanceOf[ASTNode].getText == "TOK_TABCOLLIST")
- .getOrElse(sys.error("No columnList!")).getChildren
-
- colList.asScala.map(nodeToAttribute)
- }
-
- /** Extractor for matching Hive's AST Tokens. */
- private[hive] case class Token(name: String, children: Seq[ASTNode]) extends Node {
- def getName(): String = name
- def getChildren(): java.util.List[Node] = {
- val col = new java.util.ArrayList[Node](children.size)
- children.foreach(col.add(_))
- col
- }
- }
- object Token {
- /** @return matches of the form (tokenName, children). */
- def unapply(t: Any): Option[(String, Seq[ASTNode])] = t match {
- case t: ASTNode =>
- CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
- Some((t.getText,
- Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
- case t: Token => Some((t.name, t.children))
- case _ => None
- }
- }
-
- protected def getClauses(
- clauseNames: Seq[String],
- nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = {
- var remainingNodes = nodeList
- val clauses = clauseNames.map { clauseName =>
- val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
- remainingNodes = nonMatches ++ (if (matches.nonEmpty) matches.tail else Nil)
- matches.headOption
- }
-
- if (remainingNodes.nonEmpty) {
- sys.error(
- s"""Unhandled clauses: ${remainingNodes.map(dumpTree(_)).mkString("\n")}.
- |You are likely trying to use an unsupported Hive feature."""".stripMargin)
- }
- clauses
- }
-
- def getClause(clauseName: String, nodeList: Seq[Node]): Node =
- getClauseOption(clauseName, nodeList).getOrElse(sys.error(
- s"Expected clause $clauseName missing from ${nodeList.map(dumpTree(_)).mkString("\n")}"))
-
- def getClauseOption(clauseName: String, nodeList: Seq[Node]): Option[Node] = {
- nodeList.filter { case ast: ASTNode => ast.getText == clauseName } match {
- case Seq(oneMatch) => Some(oneMatch)
- case Seq() => None
- case _ => sys.error(s"Found multiple instances of clause $clauseName")
- }
- }
-
- protected def nodeToAttribute(node: Node): Attribute = node match {
- case Token("TOK_TABCOL", Token(colName, Nil) :: dataType :: Nil) =>
- AttributeReference(colName, nodeToDataType(dataType), true)()
-
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
- }
-
- protected def nodeToDataType(node: Node): DataType = node match {
- case Token("TOK_DECIMAL", precision :: scale :: Nil) =>
- DecimalType(precision.getText.toInt, scale.getText.toInt)
- case Token("TOK_DECIMAL", precision :: Nil) =>
- DecimalType(precision.getText.toInt, 0)
- case Token("TOK_DECIMAL", Nil) => DecimalType.USER_DEFAULT
- case Token("TOK_BIGINT", Nil) => LongType
- case Token("TOK_INT", Nil) => IntegerType
- case Token("TOK_TINYINT", Nil) => ByteType
- case Token("TOK_SMALLINT", Nil) => ShortType
- case Token("TOK_BOOLEAN", Nil) => BooleanType
- case Token("TOK_STRING", Nil) => StringType
- case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType
- case Token("TOK_FLOAT", Nil) => FloatType
- case Token("TOK_DOUBLE", Nil) => DoubleType
- case Token("TOK_DATE", Nil) => DateType
- case Token("TOK_TIMESTAMP", Nil) => TimestampType
- case Token("TOK_BINARY", Nil) => BinaryType
- case Token("TOK_LIST", elementType :: Nil) => ArrayType(nodeToDataType(elementType))
- case Token("TOK_STRUCT",
- Token("TOK_TABCOLLIST", fields) :: Nil) =>
- StructType(fields.map(nodeToStructField))
- case Token("TOK_MAP",
- keyType ::
- valueType :: Nil) =>
- MapType(nodeToDataType(keyType), nodeToDataType(valueType))
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for DataType:\n ${dumpTree(a).toString} ")
- }
-
- protected def nodeToStructField(node: Node): StructField = node match {
- case Token("TOK_TABCOL",
- Token(fieldName, Nil) ::
- dataType :: Nil) =>
- StructField(fieldName, nodeToDataType(dataType), nullable = true)
- case Token("TOK_TABCOL",
- Token(fieldName, Nil) ::
- dataType ::
- _ /* comment */:: Nil) =>
- StructField(fieldName, nodeToDataType(dataType), nullable = true)
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for StructField:\n ${dumpTree(a).toString} ")
- }
-
- protected def extractTableIdent(tableNameParts: Node): TableIdentifier = {
- tableNameParts.getChildren.asScala.map {
- case Token(part, Nil) => cleanIdentifier(part)
- } match {
- case Seq(tableOnly) => TableIdentifier(tableOnly)
- case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName))
- case other => sys.error("Hive only supports tables names like 'tableName' " +
- s"or 'databaseName.tableName', found '$other'")
- }
- }
-
- /**
- * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), (k2))
- * is equivalent to
- * SELECT MAX(value) FROM src GROUP BY k1, k2 UNION SELECT MAX(value) FROM src GROUP BY k2
- * Check the following link for details.
- *
-https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C+Grouping+and+Rollup
- *
- * The bitmask denotes the grouping expressions validity for a grouping set,
- * the bitmask also be called as grouping id (`GROUPING__ID`, the virtual column in Hive)
- * e.g. In superset (k1, k2, k3), (bit 0: k1, bit 1: k2, and bit 2: k3), the grouping id of
- * GROUPING SETS (k1, k2) and (k2) should be 3 and 2 respectively.
- */
- protected def extractGroupingSet(children: Seq[ASTNode]): (Seq[Expression], Seq[Int]) = {
- val (keyASTs, setASTs) = children.partition( n => n match {
- case Token("TOK_GROUPING_SETS_EXPRESSION", children) => false // grouping sets
- case _ => true // grouping keys
- })
-
- val keys = keyASTs.map(nodeToExpr).toSeq
- val keyMap = keyASTs.map(_.toStringTree).zipWithIndex.toMap
-
- val bitmasks: Seq[Int] = setASTs.map(set => set match {
- case Token("TOK_GROUPING_SETS_EXPRESSION", null) => 0
- case Token("TOK_GROUPING_SETS_EXPRESSION", children) =>
- children.foldLeft(0)((bitmap, col) => {
- val colString = col.asInstanceOf[ASTNode].toStringTree()
- require(keyMap.contains(colString), s"$colString doens't show up in the GROUP BY list")
- bitmap | 1 << keyMap(colString)
- })
- case _ => sys.error("Expect GROUPING SETS clause")
- })
-
- (keys, bitmasks)
- }
-
- protected def getProperties(node: Node): Seq[(String, String)] = node match {
+ protected def getProperties(node: ASTNode): Seq[(String, String)] = node match {
case Token("TOK_TABLEPROPLIST", list) =>
list.map {
case Token("TOK_TABLEPROPERTY", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- (unquoteString(key) -> unquoteString(value))
+ unquoteString(key) -> unquoteString(value)
}
}
private def createView(
view: ASTNode,
- context: Context,
viewNameParts: ASTNode,
query: ASTNode,
schema: Seq[HiveColumn],
@@ -524,8 +206,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
replace: Boolean): CreateViewAsSelect = {
val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts)
- val originalText = context.getTokenRewriteStream
- .toString(query.getTokenStartIndex, query.getTokenStopIndex)
+ val originalText = query.source
val tableDesc = HiveTable(
specifiedDatabase = dbName,
@@ -544,104 +225,67 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
// We need to keep the original SQL string so that if `spark.sql.nativeView` is
// false, we can fall back to use hive native command later.
// We can remove this when parser is configurable(can access SQLConf) in the future.
- val sql = context.getTokenRewriteStream
- .toString(view.getTokenStartIndex, view.getTokenStopIndex)
- CreateViewAsSelect(tableDesc, nodeToPlan(query, context), allowExist, replace, sql)
+ val sql = view.source
+ CreateViewAsSelect(tableDesc, nodeToPlan(query), allowExist, replace, sql)
}
- protected def nodeToPlan(node: ASTNode, context: Context): LogicalPlan = node match {
- // Special drop table that also uncaches.
- case Token("TOK_DROPTABLE",
- Token("TOK_TABNAME", tableNameParts) ::
- ifExists) =>
- val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
- DropTable(tableName, ifExists.nonEmpty)
- // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
- case Token("TOK_ANALYZE",
- Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
- isNoscan) =>
- // Reference:
- // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
- if (partitionSpec.nonEmpty) {
- // Analyze partitions will be treated as a Hive native command.
- NativePlaceholder
- } else if (isNoscan.isEmpty) {
- // If users do not specify "noscan", it will be treated as a Hive native command.
- NativePlaceholder
- } else {
- val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
- AnalyzeTable(tableName)
+ protected override def createPlan(
+ sql: String,
+ node: ASTNode): LogicalPlan = {
+ if (nativeCommands.contains(node.text)) {
+ HiveNativeCommand(sql)
+ } else {
+ nodeToPlan(node) match {
+ case NativePlaceholder => HiveNativeCommand(sql)
+ case plan => plan
}
- // Just fake explain for any of the native commands.
- case Token("TOK_EXPLAIN", explainArgs)
- if noExplainCommands.contains(explainArgs.head.getText) =>
- ExplainCommand(OneRowRelation)
- case Token("TOK_EXPLAIN", explainArgs)
- if "TOK_CREATETABLE" == explainArgs.head.getText =>
- val Some(crtTbl) :: _ :: extended :: Nil =
- getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
- ExplainCommand(
- nodeToPlan(crtTbl, context),
- extended = extended.isDefined)
- case Token("TOK_EXPLAIN", explainArgs) =>
- // Ignore FORMATTED if present.
- val Some(query) :: _ :: extended :: Nil =
- getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
- ExplainCommand(
- nodeToPlan(query, context),
- extended = extended.isDefined)
-
- case Token("TOK_DESCTABLE", describeArgs) =>
- // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
- val Some(tableType) :: formatted :: extended :: pretty :: Nil =
- getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs)
- if (formatted.isDefined || pretty.isDefined) {
- // FORMATTED and PRETTY are not supported and this statement will be treated as
- // a Hive native command.
- NativePlaceholder
- } else {
- tableType match {
- case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts :: Nil) :: Nil) => {
- nameParts match {
- case Token(".", dbName :: tableName :: Nil) =>
- // It is describing a table with the format like "describe db.table".
- // TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue.
- val tableIdent = extractTableIdent(nameParts)
- DescribeCommand(
- UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
- case Token(".", dbName :: tableName :: colName :: Nil) =>
- // It is describing a column with the format like "describe db.table column".
- NativePlaceholder
- case tableName =>
- // It is describing a table with the format like "describe table".
- DescribeCommand(
- UnresolvedRelation(TableIdentifier(tableName.getText), None),
- isExtended = extended.isDefined)
- }
- }
- // All other cases.
- case _ => NativePlaceholder
+ }
+ }
+
+ protected override def isNoExplainCommand(command: String): Boolean =
+ noExplainCommands.contains(command)
+
+ protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
+ node match {
+ // Special drop table that also uncaches.
+ case Token("TOK_DROPTABLE", Token("TOK_TABNAME", tableNameParts) :: ifExists) =>
+ val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
+ DropTable(tableName, ifExists.nonEmpty)
+
+ // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
+ case Token("TOK_ANALYZE",
+ Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: isNoscan) =>
+ // Reference:
+ // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
+ if (partitionSpec.nonEmpty) {
+ // Analyze partitions will be treated as a Hive native command.
+ NativePlaceholder
+ } else if (isNoscan.isEmpty) {
+ // If users do not specify "noscan", it will be treated as a Hive native command.
+ NativePlaceholder
+ } else {
+ val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".")
+ AnalyzeTable(tableName)
}
- }
- case view @ Token("TOK_ALTERVIEW", children) =>
- val Some(viewNameParts) :: maybeQuery :: ignores =
- getClauses(Seq(
- "TOK_TABNAME",
- "TOK_QUERY",
- "TOK_ALTERVIEW_ADDPARTS",
- "TOK_ALTERVIEW_DROPPARTS",
- "TOK_ALTERVIEW_PROPERTIES",
- "TOK_ALTERVIEW_RENAME"), children)
+ case view @ Token("TOK_ALTERVIEW", children) =>
+ val Some(nameParts) :: maybeQuery :: _ =
+ getClauses(Seq(
+ "TOK_TABNAME",
+ "TOK_QUERY",
+ "TOK_ALTERVIEW_ADDPARTS",
+ "TOK_ALTERVIEW_DROPPARTS",
+ "TOK_ALTERVIEW_PROPERTIES",
+ "TOK_ALTERVIEW_RENAME"), children)
- // if ALTER VIEW doesn't have query part, let hive to handle it.
- maybeQuery.map { query =>
- createView(view, context, viewNameParts, query, Nil, Map(), false, true)
- }.getOrElse(NativePlaceholder)
+ // if ALTER VIEW doesn't have query part, let hive to handle it.
+ maybeQuery.map { query =>
+ createView(view, nameParts, query, Nil, Map(), allowExist = false, replace = true)
+ }.getOrElse(NativePlaceholder)
- case view @ Token("TOK_CREATEVIEW", children)
+ case view @ Token("TOK_CREATEVIEW", children)
if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
- val Seq(
+ val Seq(
Some(viewNameParts),
Some(query),
maybeComment,
@@ -650,1224 +294,466 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
maybeProperties,
maybeColumns,
maybePartCols
- ) = getClauses(Seq(
- "TOK_TABNAME",
- "TOK_QUERY",
- "TOK_TABLECOMMENT",
- "TOK_ORREPLACE",
- "TOK_IFNOTEXISTS",
- "TOK_TABLEPROPERTIES",
- "TOK_TABCOLNAME",
- "TOK_VIEWPARTCOLS"), children)
-
- // If the view is partitioned, we let hive handle it.
- if (maybePartCols.isDefined) {
- NativePlaceholder
- } else {
- val schema = maybeColumns.map { cols =>
- SemanticAnalyzer.getColumns(cols, true).asScala.map { field =>
+ ) = getClauses(Seq(
+ "TOK_TABNAME",
+ "TOK_QUERY",
+ "TOK_TABLECOMMENT",
+ "TOK_ORREPLACE",
+ "TOK_IFNOTEXISTS",
+ "TOK_TABLEPROPERTIES",
+ "TOK_TABCOLNAME",
+ "TOK_VIEWPARTCOLS"), children)
+
+ // If the view is partitioned, we let hive handle it.
+ if (maybePartCols.isDefined) {
+ NativePlaceholder
+ } else {
+ val schema = maybeColumns.map { cols =>
// We can't specify column types when create view, so fill it with null first, and
// update it after the schema has been resolved later.
- HiveColumn(field.getName, null, field.getComment)
- }
- }.getOrElse(Seq.empty[HiveColumn])
-
- val properties = scala.collection.mutable.Map.empty[String, String]
-
- maybeProperties.foreach {
- case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
- properties ++= getProperties(list)
- }
-
- maybeComment.foreach {
- case Token("TOK_TABLECOMMENT", child :: Nil) =>
- val comment = SemanticAnalyzer.unescapeSQLString(child.getText)
- if (comment ne null) {
- properties += ("comment" -> comment)
- }
- }
-
- createView(view, context, viewNameParts, query, schema, properties.toMap,
- allowExisting.isDefined, replace.isDefined)
- }
-
- case Token("TOK_CREATETABLE", children)
- if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
- // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
- val (
- Some(tableNameParts) ::
- _ /* likeTable */ ::
- externalTable ::
- Some(query) ::
- allowExisting +:
- ignores) =
- getClauses(
- Seq(
- "TOK_TABNAME",
- "TOK_LIKETABLE",
- "EXTERNAL",
- "TOK_QUERY",
- "TOK_IFNOTEXISTS",
- "TOK_TABLECOMMENT",
- "TOK_TABCOLLIST",
- "TOK_TABLEPARTCOLS", // Partitioned by
- "TOK_TABLEBUCKETS", // Clustered by
- "TOK_TABLESKEWED", // Skewed by
- "TOK_TABLEROWFORMAT",
- "TOK_TABLESERIALIZER",
- "TOK_FILEFORMAT_GENERIC",
- "TOK_TABLEFILEFORMAT", // User-provided InputFormat and OutputFormat
- "TOK_STORAGEHANDLER", // Storage handler
- "TOK_TABLELOCATION",
- "TOK_TABLEPROPERTIES"),
- children)
- val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
-
- // TODO add bucket support
- var tableDesc: HiveTable = HiveTable(
- specifiedDatabase = dbName,
- name = tblName,
- schema = Seq.empty[HiveColumn],
- partitionColumns = Seq.empty[HiveColumn],
- properties = Map[String, String](),
- serdeProperties = Map[String, String](),
- tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
- location = None,
- inputFormat = None,
- outputFormat = None,
- serde = None,
- viewText = None)
-
- // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
- val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
- // handle the default format for the storage type abbreviation
- val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
- HiveSerDe(
- inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
- }
+ nodeToColumns(cols, lowerCase = true).map(_.copy(hiveType = null))
+ }.getOrElse(Seq.empty[HiveColumn])
- hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
- hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
- hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
+ val properties = scala.collection.mutable.Map.empty[String, String]
- children.collect {
- case list @ Token("TOK_TABCOLLIST", _) =>
- val cols = SemanticAnalyzer.getColumns(list, true)
- if (cols != null) {
- tableDesc = tableDesc.copy(
- schema = cols.asScala.map { field =>
- HiveColumn(field.getName, field.getType, field.getComment)
- })
+ maybeProperties.foreach {
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+ properties ++= getProperties(list)
}
- case Token("TOK_TABLECOMMENT", child :: Nil) =>
- val comment = SemanticAnalyzer.unescapeSQLString(child.getText)
- // TODO support the sql text
- tableDesc = tableDesc.copy(viewText = Option(comment))
- case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
- val cols = SemanticAnalyzer.getColumns(list(0), false)
- if (cols != null) {
- tableDesc = tableDesc.copy(
- partitionColumns = cols.asScala.map { field =>
- HiveColumn(field.getName, field.getType, field.getComment)
- })
- }
- case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) =>
- val serdeParams = new java.util.HashMap[String, String]()
- child match {
- case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
- val fieldDelim = SemanticAnalyzer.unescapeSQLString (rowChild1.getText())
- serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
- serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
- if (rowChild2.length > 1) {
- val fieldEscape = SemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
- serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
- }
- case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
- val collItemDelim = SemanticAnalyzer.unescapeSQLString(rowChild.getText)
- serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
- case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
- val mapKeyDelim = SemanticAnalyzer.unescapeSQLString(rowChild.getText)
- serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
- case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
- val lineDelim = SemanticAnalyzer.unescapeSQLString(rowChild.getText)
- if (!(lineDelim == "\n") && !(lineDelim == "10")) {
- throw new AnalysisException(
- SemanticAnalyzer.generateErrorMessage(
- rowChild,
- ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
- }
- serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
- case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
- val nullFormat = SemanticAnalyzer.unescapeSQLString(rowChild.getText)
- // TODO support the nullFormat
- case _ => assert(false)
- }
- tableDesc = tableDesc.copy(
- serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
- case Token("TOK_TABLELOCATION", child :: Nil) =>
- var location = SemanticAnalyzer.unescapeSQLString(child.getText)
- location = SemanticAnalyzer.relativeToAbsolutePath(hiveConf, location)
- tableDesc = tableDesc.copy(location = Option(location))
- case Token("TOK_TABLESERIALIZER", child :: Nil) =>
- tableDesc = tableDesc.copy(
- serde = Option(SemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
- if (child.getChildCount == 2) {
- val serdeParams = new java.util.HashMap[String, String]()
- SemanticAnalyzer.readProps(
- (child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
- tableDesc = tableDesc.copy(
- serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
- }
- case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
- child.getText().toLowerCase(Locale.ENGLISH) match {
- case "orc" =>
- tableDesc = tableDesc.copy(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
- serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
- }
-
- case "parquet" =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
- outputFormat =
- Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
- serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
- }
-
- case "rcfile" =>
- tableDesc = tableDesc.copy(
- inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
- outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
- serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
- }
- case "textfile" =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option("org.apache.hadoop.mapred.TextInputFormat"),
- outputFormat =
- Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
-
- case "sequencefile" =>
- tableDesc = tableDesc.copy(
- inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
- outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
-
- case "avro" =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
- outputFormat =
- Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
- if (tableDesc.serde.isEmpty) {
- tableDesc = tableDesc.copy(
- serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
+ maybeComment.foreach {
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ val comment = unescapeSQLString(child.text)
+ if (comment ne null) {
+ properties += ("comment" -> comment)
}
-
- case _ =>
- throw new SemanticException(
- s"Unrecognized file format in STORED AS clause: ${child.getText}")
- }
-
- case Token("TOK_TABLESERIALIZER",
- Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
- tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName)))
-
- otherProps match {
- case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil =>
- tableDesc = tableDesc.copy(
- serdeProperties = tableDesc.serdeProperties ++ getProperties(list))
- case Nil =>
}
- case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
- tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
- case list @ Token("TOK_TABLEFILEFORMAT", children) =>
- tableDesc = tableDesc.copy(
- inputFormat =
- Option(SemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
- outputFormat =
- Option(SemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
- case Token("TOK_STORAGEHANDLER", _) =>
- throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
- case _ => // Unsupport features
- }
-
- CreateTableAsSelect(tableDesc, nodeToPlan(query, context), allowExisting != None)
-
- // If its not a "CTAS" like above then take it as a native command
- case Token("TOK_CREATETABLE", _) => NativePlaceholder
-
- // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
- case Token("TOK_TRUNCATETABLE",
- Token("TOK_TABLE_PARTITION", table) :: Nil) => NativePlaceholder
-
- case Token("TOK_QUERY", queryArgs)
- if Seq("TOK_CTE", "TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>
-
- val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
- queryArgs match {
- case Token("TOK_CTE", ctes) :: Token("TOK_FROM", from) :: inserts =>
- val cteRelations = ctes.map { node =>
- val relation = nodeToRelation(node, context).asInstanceOf[Subquery]
- relation.alias -> relation
- }
- (Some(from.head), inserts, Some(cteRelations.toMap))
- case Token("TOK_FROM", from) :: inserts =>
- (Some(from.head), inserts, None)
- case Token("TOK_INSERT", _) :: Nil =>
- (None, queryArgs, None)
+ createView(view, viewNameParts, query, schema, properties.toMap,
+ allowExisting.isDefined, replace.isDefined)
}
- // Return one query for each insert clause.
- val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
+ case Token("TOK_CREATETABLE", children)
+ if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
+ // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val (
- intoClause ::
- destClause ::
- selectClause ::
- selectDistinctClause ::
- whereClause ::
- groupByClause ::
- rollupGroupByClause ::
- cubeGroupByClause ::
- groupingSetsClause ::
- orderByClause ::
- havingClause ::
- sortByClause ::
- clusterByClause ::
- distributeByClause ::
- limitClause ::
- lateralViewClause ::
- windowClause :: Nil) = {
+ Some(tableNameParts) ::
+ _ /* likeTable */ ::
+ externalTable ::
+ Some(query) ::
+ allowExisting +:
+ _) =
getClauses(
Seq(
- "TOK_INSERT_INTO",
- "TOK_DESTINATION",
- "TOK_SELECT",
- "TOK_SELECTDI",
- "TOK_WHERE",
- "TOK_GROUPBY",
- "TOK_ROLLUP_GROUPBY",
- "TOK_CUBE_GROUPBY",
- "TOK_GROUPING_SETS",
- "TOK_ORDERBY",
- "TOK_HAVING",
- "TOK_SORTBY",
- "TOK_CLUSTERBY",
- "TOK_DISTRIBUTEBY",
- "TOK_LIMIT",
- "TOK_LATERAL_VIEW",
- "WINDOW"),
- singleInsert)
+ "TOK_TABNAME",
+ "TOK_LIKETABLE",
+ "EXTERNAL",
+ "TOK_QUERY",
+ "TOK_IFNOTEXISTS",
+ "TOK_TABLECOMMENT",
+ "TOK_TABCOLLIST",
+ "TOK_TABLEPARTCOLS", // Partitioned by
+ "TOK_TABLEBUCKETS", // Clustered by
+ "TOK_TABLESKEWED", // Skewed by
+ "TOK_TABLEROWFORMAT",
+ "TOK_TABLESERIALIZER",
+ "TOK_FILEFORMAT_GENERIC",
+ "TOK_TABLEFILEFORMAT", // User-provided InputFormat and OutputFormat
+ "TOK_STORAGEHANDLER", // Storage handler
+ "TOK_TABLELOCATION",
+ "TOK_TABLEPROPERTIES"),
+ children)
+ val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts)
+
+ // TODO add bucket support
+ var tableDesc: HiveTable = HiveTable(
+ specifiedDatabase = dbName,
+ name = tblName,
+ schema = Seq.empty[HiveColumn],
+ partitionColumns = Seq.empty[HiveColumn],
+ properties = Map[String, String](),
+ serdeProperties = Map[String, String](),
+ tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
+ location = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ viewText = None)
+
+ // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
+ val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
+ // handle the default format for the storage type abbreviation
+ val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
+ HiveSerDe(
+ inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}
- val relations = fromClause match {
- case Some(f) => nodeToRelation(f, context)
- case None => OneRowRelation
- }
+ hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
+ hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
+ hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))
- val withWhere = whereClause.map { whereNode =>
- val Seq(whereExpr) = whereNode.getChildren.asScala
- Filter(nodeToExpr(whereExpr), relations)
- }.getOrElse(relations)
-
- val select =
- (selectClause orElse selectDistinctClause).getOrElse(sys.error("No select clause."))
-
- // Script transformations are expressed as a select clause with a single expression of type
- // TOK_TRANSFORM
- val transformation = select.getChildren.iterator().next() match {
- case Token("TOK_SELEXPR",
- Token("TOK_TRANSFORM",
- Token("TOK_EXPLIST", inputExprs) ::
- Token("TOK_SERDE", inputSerdeClause) ::
- Token("TOK_RECORDWRITER", writerClause) ::
- // TODO: Need to support other types of (in/out)put
- Token(script, Nil) ::
- Token("TOK_SERDE", outputSerdeClause) ::
- Token("TOK_RECORDREADER", readerClause) ::
- outputClause) :: Nil) =>
-
- val (output, schemaLess) = outputClause match {
- case Token("TOK_ALIASLIST", aliases) :: Nil =>
- (aliases.map { case Token(name, Nil) => AttributeReference(name, StringType)() },
- false)
- case Token("TOK_TABCOLLIST", attributes) :: Nil =>
- (attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) =>
- AttributeReference(name, nodeToDataType(dataType))() }, false)
- case Nil =>
- (List(AttributeReference("key", StringType)(),
- AttributeReference("value", StringType)()), true)
+ children.collect {
+ case list @ Token("TOK_TABCOLLIST", _) =>
+ val cols = nodeToColumns(list, lowerCase = true)
+ if (cols != null) {
+ tableDesc = tableDesc.copy(schema = cols)
}
-
- type SerDeInfo = (
- Seq[(String, String)], // Input row format information
- Option[String], // Optional input SerDe class
- Seq[(String, String)], // Input SerDe properties
- Boolean // Whether to use default record reader/writer
- )
-
- def matchSerDe(clause: Seq[ASTNode]): SerDeInfo = clause match {
- case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
- val rowFormat = propsClause.map {
- case Token(name, Token(value, Nil) :: Nil) => (name, value)
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ val comment = unescapeSQLString(child.text)
+ // TODO support the sql text
+ tableDesc = tableDesc.copy(viewText = Option(comment))
+ case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
+ val cols = nodeToColumns(list.head, lowerCase = false)
+ if (cols != null) {
+ tableDesc = tableDesc.copy(partitionColumns = cols)
+ }
+ case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) =>
+ val serdeParams = new java.util.HashMap[String, String]()
+ child match {
+ case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
+ val fieldDelim = unescapeSQLString (rowChild1.text)
+ serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
+ serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
+ if (rowChild2.length > 1) {
+ val fieldEscape = unescapeSQLString (rowChild2.head.text)
+ serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
}
- (rowFormat, None, Nil, false)
-
- case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil =>
- (Nil, Some(SemanticAnalyzer.unescapeSQLString(serdeClass)), Nil, false)
-
- case Token("TOK_SERDENAME", Token(serdeClass, Nil) ::
- Token("TOK_TABLEPROPERTIES",
- Token("TOK_TABLEPROPLIST", propsClause) :: Nil) :: Nil) :: Nil =>
- val serdeProps = propsClause.map {
- case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) =>
- (SemanticAnalyzer.unescapeSQLString(name),
- SemanticAnalyzer.unescapeSQLString(value))
+ case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
+ val collItemDelim = unescapeSQLString(rowChild.text)
+ serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
+ case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
+ val mapKeyDelim = unescapeSQLString(rowChild.text)
+ serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
+ case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
+ val lineDelim = unescapeSQLString(rowChild.text)
+ if (!(lineDelim == "\n") && !(lineDelim == "10")) {
+ throw new AnalysisException(
+ s"LINES TERMINATED BY only supports newline '\\n' right now: $rowChild")
}
-
- // SPARK-10310: Special cases LazySimpleSerDe
- // TODO Fully supports user-defined record reader/writer classes
- val unescapedSerDeClass = SemanticAnalyzer.unescapeSQLString(serdeClass)
- val useDefaultRecordReaderWriter =
- unescapedSerDeClass == classOf[LazySimpleSerDe].getCanonicalName
- (Nil, Some(unescapedSerDeClass), serdeProps, useDefaultRecordReaderWriter)
-
- case Nil =>
- // Uses default TextRecordReader/TextRecordWriter, sets field delimiter here
- val serdeProps = Seq(serdeConstants.FIELD_DELIM -> "\t")
- (Nil, Option(hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)), serdeProps, true)
- }
-
- val (inRowFormat, inSerdeClass, inSerdeProps, useDefaultRecordReader) =
- matchSerDe(inputSerdeClause)
-
- val (outRowFormat, outSerdeClass, outSerdeProps, useDefaultRecordWriter) =
- matchSerDe(outputSerdeClause)
-
- val unescapedScript = SemanticAnalyzer.unescapeSQLString(script)
-
- // TODO Adds support for user-defined record reader/writer classes
- val recordReaderClass = if (useDefaultRecordReader) {
- Option(hiveConf.getVar(ConfVars.HIVESCRIPTRECORDREADER))
- } else {
- None
+ serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
+ case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
+ val nullFormat = unescapeSQLString(rowChild.text)
+ // TODO support the nullFormat
+ case _ => assert(false)
}
-
- val recordWriterClass = if (useDefaultRecordWriter) {
- Option(hiveConf.getVar(ConfVars.HIVESCRIPTRECORDWRITER))
- } else {
- None
+ tableDesc = tableDesc.copy(
+ serdeProperties = tableDesc.serdeProperties ++ serdeParams.asScala)
+ case Token("TOK_TABLELOCATION", child :: Nil) =>
+ val location = EximUtil.relativeToAbsolutePath(hiveConf, unescapeSQLString(child.text))
+ tableDesc = tableDesc.copy(location = Option(location))
+ case Token("TOK_TABLESERIALIZER", child :: Nil) =>
+ tableDesc = tableDesc.copy(
+ serde = Option(unescapeSQLString(child.children.head.text)))
+ if (child.numChildren == 2) {
+ // This is based on the readProps(..) method in
+ // ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java:
+ val serdeParams = child.children(1).children.head.children.map {
+ case Token(_, Token(prop, Nil) :: valueNode) =>
+ val value = valueNode.headOption
+ .map(_.text)
+ .map(unescapeSQLString)
+ .orNull
+ (unescapeSQLString(prop), value)
+ }.toMap
+ tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
}
+ case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
+ child.text.toLowerCase(Locale.ENGLISH) match {
+ case "orc" =>
+ tableDesc = tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(
+ serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+ }
- val schema = HiveScriptIOSchema(
- inRowFormat, outRowFormat,
- inSerdeClass, outSerdeClass,
- inSerdeProps, outSerdeProps,
- recordReaderClass, recordWriterClass,
- schemaLess)
-
- Some(
- logical.ScriptTransformation(
- inputExprs.map(nodeToExpr),
- unescapedScript,
- output,
- withWhere, schema))
- case _ => None
- }
-
- val withLateralView = lateralViewClause.map { lv =>
- val Token("TOK_SELECT",
- Token("TOK_SELEXPR", clauses) :: Nil) = lv.getChildren.iterator().next()
-
- val alias = getClause("TOK_TABALIAS", clauses).getChildren.iterator().next()
- .asInstanceOf[ASTNode].getText
-
- val (generator, attributes) = nodesToGenerator(clauses)
- Generate(
- generator,
- join = true,
- outer = false,
- Some(alias.toLowerCase),
- attributes.map(UnresolvedAttribute(_)),
- withWhere)
- }.getOrElse(withWhere)
-
- // The projection of the query can either be a normal projection, an aggregation
- // (if there is a group by) or a script transformation.
- val withProject: LogicalPlan = transformation.getOrElse {
- val selectExpressions =
- select.getChildren.asScala.flatMap(selExprNodeToExpr).map(UnresolvedAlias(_))
- Seq(
- groupByClause.map(e => e match {
- case Token("TOK_GROUPBY", children) =>
- // Not a transformation so must be either project or aggregation.
- Aggregate(children.map(nodeToExpr), selectExpressions, withLateralView)
- case _ => sys.error("Expect GROUP BY")
- }),
- groupingSetsClause.map(e => e match {
- case Token("TOK_GROUPING_SETS", children) =>
- val(groupByExprs, masks) = extractGroupingSet(children)
- GroupingSets(masks, groupByExprs, withLateralView, selectExpressions)
- case _ => sys.error("Expect GROUPING SETS")
- }),
- rollupGroupByClause.map(e => e match {
- case Token("TOK_ROLLUP_GROUPBY", children) =>
- Aggregate(Seq(Rollup(children.map(nodeToExpr))), selectExpressions, withLateralView)
- case _ => sys.error("Expect WITH ROLLUP")
- }),
- cubeGroupByClause.map(e => e match {
- case Token("TOK_CUBE_GROUPBY", children) =>
- Aggregate(Seq(Cube(children.map(nodeToExpr))), selectExpressions, withLateralView)
- case _ => sys.error("Expect WITH CUBE")
- }),
- Some(Project(selectExpressions, withLateralView))).flatten.head
- }
+ case "parquet" =>
+ tableDesc = tableDesc.copy(
+ inputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(
+ serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+ }
- // Handle HAVING clause.
- val withHaving = havingClause.map { h =>
- val havingExpr = h.getChildren.asScala match { case Seq(hexpr) => nodeToExpr(hexpr) }
- // Note that we added a cast to boolean. If the expression itself is already boolean,
- // the optimizer will get rid of the unnecessary cast.
- Filter(Cast(havingExpr, BooleanType), withProject)
- }.getOrElse(withProject)
-
- // Handle SELECT DISTINCT
- val withDistinct =
- if (selectDistinctClause.isDefined) Distinct(withHaving) else withHaving
-
- // Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause.
- val withSort =
- (orderByClause, sortByClause, distributeByClause, clusterByClause) match {
- case (Some(totalOrdering), None, None, None) =>
- Sort(totalOrdering.getChildren.asScala.map(nodeToSortOrder), true, withDistinct)
- case (None, Some(perPartitionOrdering), None, None) =>
- Sort(
- perPartitionOrdering.getChildren.asScala.map(nodeToSortOrder),
- false, withDistinct)
- case (None, None, Some(partitionExprs), None) =>
- RepartitionByExpression(
- partitionExprs.getChildren.asScala.map(nodeToExpr), withDistinct)
- case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
- Sort(
- perPartitionOrdering.getChildren.asScala.map(nodeToSortOrder), false,
- RepartitionByExpression(
- partitionExprs.getChildren.asScala.map(nodeToExpr),
- withDistinct))
- case (None, None, None, Some(clusterExprs)) =>
- Sort(
- clusterExprs.getChildren.asScala.map(nodeToExpr).map(SortOrder(_, Ascending)),
- false,
- RepartitionByExpression(
- clusterExprs.getChildren.asScala.map(nodeToExpr),
- withDistinct))
- case (None, None, None, None) => withDistinct
- case _ => sys.error("Unsupported set of ordering / distribution clauses.")
- }
+ case "rcfile" =>
+ tableDesc = tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(serde =
+ Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
+ }
- val withLimit =
- limitClause.map(l => nodeToExpr(l.getChildren.iterator().next()))
- .map(Limit(_, withSort))
- .getOrElse(withSort)
-
- // Collect all window specifications defined in the WINDOW clause.
- val windowDefinitions = windowClause.map(_.getChildren.asScala.collect {
- case Token("TOK_WINDOWDEF",
- Token(windowName, Nil) :: Token("TOK_WINDOWSPEC", spec) :: Nil) =>
- windowName -> nodesToWindowSpecification(spec)
- }.toMap)
- // Handle cases like
- // window w1 as (partition by p_mfgr order by p_name
- // range between 2 preceding and 2 following),
- // w2 as w1
- val resolvedCrossReference = windowDefinitions.map {
- windowDefMap => windowDefMap.map {
- case (windowName, WindowSpecReference(other)) =>
- (windowName, windowDefMap(other).asInstanceOf[WindowSpecDefinition])
- case o => o.asInstanceOf[(String, WindowSpecDefinition)]
- }
- }
+ case "textfile" =>
+ tableDesc = tableDesc.copy(
+ inputFormat =
+ Option("org.apache.hadoop.mapred.TextInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
- val withWindowDefinitions =
- resolvedCrossReference.map(WithWindowDefinition(_, withLimit)).getOrElse(withLimit)
-
- // TOK_INSERT_INTO means to add files to the table.
- // TOK_DESTINATION means to overwrite the table.
- val resultDestination =
- (intoClause orElse destClause).getOrElse(sys.error("No destination found."))
- val overwrite = intoClause.isEmpty
- nodeToDest(
- resultDestination,
- withWindowDefinitions,
- overwrite)
- }
+ case "sequencefile" =>
+ tableDesc = tableDesc.copy(
+ inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+ outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
- // If there are multiple INSERTS just UNION them together into on query.
- val query = queries.reduceLeft(Union)
+ case "avro" =>
+ tableDesc = tableDesc.copy(
+ inputFormat =
+ Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"),
+ outputFormat =
+ Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
+ if (tableDesc.serde.isEmpty) {
+ tableDesc = tableDesc.copy(
+ serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
+ }
- // return With plan if there is CTE
- cteRelations.map(With(query, _)).getOrElse(query)
+ case _ =>
+ throw new AnalysisException(
+ s"Unrecognized file format in STORED AS clause: ${child.text}")
+ }
- // HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT
- case Token("TOK_UNIONALL", left :: right :: Nil) =>
- Union(nodeToPlan(left, context), nodeToPlan(right, context))
+ case Token("TOK_TABLESERIALIZER",
+ Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
+ tableDesc = tableDesc.copy(serde = Option(unquoteString(serdeName)))
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for $node:\n ${dumpTree(a).toString} ")
- }
+ otherProps match {
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) :: Nil =>
+ tableDesc = tableDesc.copy(
+ serdeProperties = tableDesc.serdeProperties ++ getProperties(list))
+ case _ =>
+ }
- val allJoinTokens = "(TOK_.*JOIN)".r
- val laterViewToken = "TOK_LATERAL_VIEW(.*)".r
- def nodeToRelation(node: Node, context: Context): LogicalPlan = node match {
- case Token("TOK_SUBQUERY",
- query :: Token(alias, Nil) :: Nil) =>
- Subquery(cleanIdentifier(alias), nodeToPlan(query, context))
-
- case Token(laterViewToken(isOuter), selectClause :: relationClause :: Nil) =>
- val Token("TOK_SELECT",
- Token("TOK_SELEXPR", clauses) :: Nil) = selectClause
-
- val alias = getClause("TOK_TABALIAS", clauses).getChildren.iterator().next()
- .asInstanceOf[ASTNode].getText
-
- val (generator, attributes) = nodesToGenerator(clauses)
- Generate(
- generator,
- join = true,
- outer = isOuter.nonEmpty,
- Some(alias.toLowerCase),
- attributes.map(UnresolvedAttribute(_)),
- nodeToRelation(relationClause, context))
-
- /* All relations, possibly with aliases or sampling clauses. */
- case Token("TOK_TABREF", clauses) =>
- // If the last clause is not a token then it's the alias of the table.
- val (nonAliasClauses, aliasClause) =
- if (clauses.last.getText.startsWith("TOK")) {
- (clauses, None)
- } else {
- (clauses.dropRight(1), Some(clauses.last))
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+ tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
+ case list @ Token("TOK_TABLEFILEFORMAT", _) =>
+ tableDesc = tableDesc.copy(
+ inputFormat =
+ Option(unescapeSQLString(list.children.head.text)),
+ outputFormat =
+ Option(unescapeSQLString(list.children(1).text)))
+ case Token("TOK_STORAGEHANDLER", _) =>
+ throw new AnalysisException(
+ "CREATE TABLE AS SELECT cannot be used for a non-native table")
+ case _ => // Unsupport features
}
- val (Some(tableNameParts) ::
- splitSampleClause ::
- bucketSampleClause :: Nil) = {
- getClauses(Seq("TOK_TABNAME", "TOK_TABLESPLITSAMPLE", "TOK_TABLEBUCKETSAMPLE"),
- nonAliasClauses)
- }
-
- val tableIdent = extractTableIdent(tableNameParts)
- val alias = aliasClause.map { case Token(a, Nil) => cleanIdentifier(a) }
- val relation = UnresolvedRelation(tableIdent, alias)
-
- // Apply sampling if requested.
- (bucketSampleClause orElse splitSampleClause).map {
- case Token("TOK_TABLESPLITSAMPLE",
- Token("TOK_ROWCOUNT", Nil) ::
- Token(count, Nil) :: Nil) =>
- Limit(Literal(count.toInt), relation)
- case Token("TOK_TABLESPLITSAMPLE",
- Token("TOK_PERCENT", Nil) ::
- Token(fraction, Nil) :: Nil) =>
- // The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
- // function takes X PERCENT as the input and the range of X is [0, 100], we need to
- // adjust the fraction.
- require(
- fraction.toDouble >= (0.0 - RandomSampler.roundingEpsilon)
- && fraction.toDouble <= (100.0 + RandomSampler.roundingEpsilon),
- s"Sampling fraction ($fraction) must be on interval [0, 100]")
- Sample(0.0, fraction.toDouble / 100, withReplacement = false, (math.random * 1000).toInt,
- relation)
- case Token("TOK_TABLEBUCKETSAMPLE",
- Token(numerator, Nil) ::
- Token(denominator, Nil) :: Nil) =>
- val fraction = numerator.toDouble / denominator.toDouble
- Sample(0.0, fraction, withReplacement = false, (math.random * 1000).toInt, relation)
- case a: ASTNode =>
- throw new NotImplementedError(
- s"""No parse rules for sampling clause: ${a.getType}, text: ${a.getText} :
- |${dumpTree(a).toString}" +
- """.stripMargin)
- }.getOrElse(relation)
-
- case Token("TOK_UNIQUEJOIN", joinArgs) =>
- val tableOrdinals =
- joinArgs.zipWithIndex.filter {
- case (arg, i) => arg.getText == "TOK_TABREF"
- }.map(_._2)
-
- val isPreserved = tableOrdinals.map(i => (i - 1 < 0) || joinArgs(i - 1).getText == "PRESERVE")
- val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i), context))
- val joinExpressions =
- tableOrdinals.map(i => joinArgs(i + 1).getChildren.asScala.map(nodeToExpr))
-
- val joinConditions = joinExpressions.sliding(2).map {
- case Seq(c1, c2) =>
- val predicates = (c1, c2).zipped.map { case (e1, e2) => EqualTo(e1, e2): Expression }
- predicates.reduceLeft(And)
- }.toBuffer
-
- val joinType = isPreserved.sliding(2).map {
- case Seq(true, true) => FullOuter
- case Seq(true, false) => LeftOuter
- case Seq(false, true) => RightOuter
- case Seq(false, false) => Inner
- }.toBuffer
-
- val joinedTables = tables.reduceLeft(Join(_, _, Inner, None))
-
- // Must be transform down.
- val joinedResult = joinedTables transform {
- case j: Join =>
- j.copy(
- condition = Some(joinConditions.remove(joinConditions.length - 1)),
- joinType = joinType.remove(joinType.length - 1))
- }
-
- val groups = joinExpressions.head.indices.map(i => Coalesce(joinExpressions.map(_(i))))
-
- // Unique join is not really the same as an outer join so we must group together results where
- // the joinExpressions are the same, taking the First of each value is only okay because the
- // user of a unique join is implicitly promising that there is only one result.
- // TODO: This doesn't actually work since [[Star]] is not a valid aggregate expression.
- // instead we should figure out how important supporting this feature is and whether it is
- // worth the number of hacks that will be required to implement it. Namely, we need to add
- // some sort of mapped star expansion that would expand all child output row to be similarly
- // named output expressions where some aggregate expression has been applied (i.e. First).
- // Aggregate(groups, Star(None, First(_)) :: Nil, joinedResult)
- throw new UnsupportedOperationException
-
- case Token(allJoinTokens(joinToken),
- relation1 ::
- relation2 :: other) =>
- if (!(other.size <= 1)) {
- sys.error(s"Unsupported join operation: $other")
- }
-
- val joinType = joinToken match {
- case "TOK_JOIN" => Inner
- case "TOK_CROSSJOIN" => Inner
- case "TOK_RIGHTOUTERJOIN" => RightOuter
- case "TOK_LEFTOUTERJOIN" => LeftOuter
- case "TOK_FULLOUTERJOIN" => FullOuter
- case "TOK_LEFTSEMIJOIN" => LeftSemi
- case "TOK_ANTIJOIN" => throw new NotImplementedError("Anti join not supported")
- }
- Join(nodeToRelation(relation1, context),
- nodeToRelation(relation2, context),
- joinType,
- other.headOption.map(nodeToExpr))
-
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
- }
+ CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting.isDefined)
- def nodeToSortOrder(node: Node): SortOrder = node match {
- case Token("TOK_TABSORTCOLNAMEASC", sortExpr :: Nil) =>
- SortOrder(nodeToExpr(sortExpr), Ascending)
- case Token("TOK_TABSORTCOLNAMEDESC", sortExpr :: Nil) =>
- SortOrder(nodeToExpr(sortExpr), Descending)
+ // If its not a "CTAS" like above then take it as a native command
+ case Token("TOK_CREATETABLE", _) =>
+ NativePlaceholder
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
- }
+ // Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
+ case Token("TOK_TRUNCATETABLE", Token("TOK_TABLE_PARTITION", table) :: Nil) =>
+ NativePlaceholder
- val destinationToken = "TOK_DESTINATION|TOK_INSERT_INTO".r
- protected def nodeToDest(
- node: Node,
- query: LogicalPlan,
- overwrite: Boolean): LogicalPlan = node match {
- case Token(destinationToken(),
- Token("TOK_DIR",
- Token("TOK_TMP_FILE", Nil) :: Nil) :: Nil) =>
- query
-
- case Token(destinationToken(),
- Token("TOK_TAB",
- tableArgs) :: Nil) =>
- val Some(tableNameParts) :: partitionClause :: Nil =
- getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
-
- val tableIdent = extractTableIdent(tableNameParts)
-
- val partitionKeys = partitionClause.map(_.getChildren.asScala.map {
- // Parse partitions. We also make keys case insensitive.
- case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value))
- case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) =>
- cleanIdentifier(key.toLowerCase) -> None
- }.toMap).getOrElse(Map.empty)
-
- InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, false)
-
- case Token(destinationToken(),
- Token("TOK_TAB",
- tableArgs) ::
- Token("TOK_IFNOTEXISTS",
- ifNotExists) :: Nil) =>
- val Some(tableNameParts) :: partitionClause :: Nil =
- getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
-
- val tableIdent = extractTableIdent(tableNameParts)
-
- val partitionKeys = partitionClause.map(_.getChildren.asScala.map {
- // Parse partitions. We also make keys case insensitive.
- case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
- cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value))
- case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) =>
- cleanIdentifier(key.toLowerCase) -> None
- }.toMap).getOrElse(Map.empty)
-
- InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, true)
-
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for ${a.getName}:" +
- s"\n ${dumpTree(a).toString} ")
+ case _ =>
+ super.nodeToPlan(node)
+ }
}
- protected def selExprNodeToExpr(node: Node): Option[Expression] = node match {
- case Token("TOK_SELEXPR", e :: Nil) =>
- Some(nodeToExpr(e))
-
- case Token("TOK_SELEXPR", e :: Token(alias, Nil) :: Nil) =>
- Some(Alias(nodeToExpr(e), cleanIdentifier(alias))())
-
- case Token("TOK_SELEXPR", e :: aliasChildren) =>
- var aliasNames = ArrayBuffer[String]()
- aliasChildren.foreach { _ match {
- case Token(name, Nil) => aliasNames += cleanIdentifier(name)
+ protected override def nodeToDescribeFallback(node: ASTNode): LogicalPlan = NativePlaceholder
+
+ protected override def nodeToTransformation(
+ node: ASTNode,
+ child: LogicalPlan): Option[ScriptTransformation] = node match {
+ case Token("TOK_SELEXPR",
+ Token("TOK_TRANSFORM",
+ Token("TOK_EXPLIST", inputExprs) ::
+ Token("TOK_SERDE", inputSerdeClause) ::
+ Token("TOK_RECORDWRITER", writerClause) ::
+ // TODO: Need to support other types of (in/out)put
+ Token(script, Nil) ::
+ Token("TOK_SERDE", outputSerdeClause) ::
+ Token("TOK_RECORDREADER", readerClause) ::
+ outputClause) :: Nil) =>
+
+ val (output, schemaLess) = outputClause match {
+ case Token("TOK_ALIASLIST", aliases) :: Nil =>
+ (aliases.map { case Token(name, Nil) => AttributeReference(name, StringType)() },
+ false)
+ case Token("TOK_TABCOLLIST", attributes) :: Nil =>
+ (attributes.map { case Token("TOK_TABCOL", Token(name, Nil) :: dataType :: Nil) =>
+ AttributeReference(name, nodeToDataType(dataType))() }, false)
+ case Nil =>
+ (List(AttributeReference("key", StringType)(),
+ AttributeReference("value", StringType)()), true)
case _ =>
- }
+ noParseRule("Transform", node)
}
- Some(MultiAlias(nodeToExpr(e), aliasNames))
-
- /* Hints are ignored */
- case Token("TOK_HINTLIST", _) => None
- case a: ASTNode =>
- throw new NotImplementedError(s"No parse rules for ${a.getName }:" +
- s"\n ${dumpTree(a).toString } ")
- }
-
- protected val escapedIdentifier = "`([^`]+)`".r
- protected val doubleQuotedString = "\"([^\"]+)\"".r
- protected val singleQuotedString = "'([^']+)'".r
+ type SerDeInfo = (
+ Seq[(String, String)], // Input row format information
+ Option[String], // Optional input SerDe class
+ Seq[(String, String)], // Input SerDe properties
+ Boolean // Whether to use default record reader/writer
+ )
+
+ def matchSerDe(clause: Seq[ASTNode]): SerDeInfo = clause match {
+ case Token("TOK_SERDEPROPS", propsClause) :: Nil =>
+ val rowFormat = propsClause.map {
+ case Token(name, Token(value, Nil) :: Nil) => (name, value)
+ }
+ (rowFormat, None, Nil, false)
- protected def unquoteString(str: String) = str match {
- case singleQuotedString(s) => s
- case doubleQuotedString(s) => s
- case other => other
- }
+ case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil =>
+ (Nil, Some(unescapeSQLString(serdeClass)), Nil, false)
- /** Strips backticks from ident if present */
- protected def cleanIdentifier(ident: String): String = ident match {
- case escapedIdentifier(i) => i
- case plainIdent => plainIdent
- }
+ case Token("TOK_SERDENAME", Token(serdeClass, Nil) ::
+ Token("TOK_TABLEPROPERTIES",
+ Token("TOK_TABLEPROPLIST", propsClause) :: Nil) :: Nil) :: Nil =>
+ val serdeProps = propsClause.map {
+ case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) =>
+ (unescapeSQLString(name), unescapeSQLString(value))
+ }
- val numericAstTypes = Seq(
- SparkSqlParser.Number,
- SparkSqlParser.TinyintLiteral,
- SparkSqlParser.SmallintLiteral,
- SparkSqlParser.BigintLiteral,
- SparkSqlParser.DecimalLiteral)
-
- /* Case insensitive matches */
- val COUNT = "(?i)COUNT".r
- val SUM = "(?i)SUM".r
- val AND = "(?i)AND".r
- val OR = "(?i)OR".r
- val NOT = "(?i)NOT".r
- val TRUE = "(?i)TRUE".r
- val FALSE = "(?i)FALSE".r
- val LIKE = "(?i)LIKE".r
- val RLIKE = "(?i)RLIKE".r
- val REGEXP = "(?i)REGEXP".r
- val IN = "(?i)IN".r
- val DIV = "(?i)DIV".r
- val BETWEEN = "(?i)BETWEEN".r
- val WHEN = "(?i)WHEN".r
- val CASE = "(?i)CASE".r
-
- protected def nodeToExpr(node: Node): Expression = node match {
- /* Attribute References */
- case Token("TOK_TABLE_OR_COL",
- Token(name, Nil) :: Nil) =>
- UnresolvedAttribute.quoted(cleanIdentifier(name))
- case Token(".", qualifier :: Token(attr, Nil) :: Nil) =>
- nodeToExpr(qualifier) match {
- case UnresolvedAttribute(nameParts) =>
- UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
- case other => UnresolvedExtractValue(other, Literal(attr))
+ // SPARK-10310: Special cases LazySimpleSerDe
+ // TODO Fully supports user-defined record reader/writer classes
+ val unescapedSerDeClass = unescapeSQLString(serdeClass)
+ val useDefaultRecordReaderWriter =
+ unescapedSerDeClass == classOf[LazySimpleSerDe].getCanonicalName
+ (Nil, Some(unescapedSerDeClass), serdeProps, useDefaultRecordReaderWriter)
+
+ case Nil =>
+ // Uses default TextRecordReader/TextRecordWriter, sets field delimiter here
+ val serdeProps = Seq(serdeConstants.FIELD_DELIM -> "\t")
+ (Nil, Option(hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)), serdeProps, true)
}
- /* Stars (*) */
- case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)
- // The format of dbName.tableName.* cannot be parsed by HiveParser. TOK_TABNAME will only
- // has a single child which is tableName.
- case Token("TOK_ALLCOLREF", Token("TOK_TABNAME", Token(name, Nil) :: Nil) :: Nil) =>
- UnresolvedStar(Some(UnresolvedAttribute.parseAttributeName(name)))
-
- /* Aggregate Functions */
- case Token("TOK_FUNCTIONDI", Token(COUNT(), Nil) :: args) =>
- Count(args.map(nodeToExpr)).toAggregateExpression(isDistinct = true)
- case Token("TOK_FUNCTIONSTAR", Token(COUNT(), Nil) :: Nil) =>
- Count(Literal(1)).toAggregateExpression()
-
- /* Casts */
- case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), StringType)
- case Token("TOK_FUNCTION", Token("TOK_VARCHAR", _) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), StringType)
- case Token("TOK_FUNCTION", Token("TOK_CHAR", _) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), StringType)
- case Token("TOK_FUNCTION", Token("TOK_INT", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), IntegerType)
- case Token("TOK_FUNCTION", Token("TOK_BIGINT", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), LongType)
- case Token("TOK_FUNCTION", Token("TOK_FLOAT", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), FloatType)
- case Token("TOK_FUNCTION", Token("TOK_DOUBLE", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), DoubleType)
- case Token("TOK_FUNCTION", Token("TOK_SMALLINT", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), ShortType)
- case Token("TOK_FUNCTION", Token("TOK_TINYINT", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), ByteType)
- case Token("TOK_FUNCTION", Token("TOK_BINARY", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), BinaryType)
- case Token("TOK_FUNCTION", Token("TOK_BOOLEAN", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), BooleanType)
- case Token("TOK_FUNCTION", Token("TOK_DECIMAL", precision :: scale :: nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), DecimalType(precision.getText.toInt, scale.getText.toInt))
- case Token("TOK_FUNCTION", Token("TOK_DECIMAL", precision :: Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), DecimalType(precision.getText.toInt, 0))
- case Token("TOK_FUNCTION", Token("TOK_DECIMAL", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), DecimalType.USER_DEFAULT)
- case Token("TOK_FUNCTION", Token("TOK_TIMESTAMP", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), TimestampType)
- case Token("TOK_FUNCTION", Token("TOK_DATE", Nil) :: arg :: Nil) =>
- Cast(nodeToExpr(arg), DateType)
-
- /* Arithmetic */
- case Token("+", child :: Nil) => nodeToExpr(child)
- case Token("-", child :: Nil) => UnaryMinus(nodeToExpr(child))
- case Token("~", child :: Nil) => BitwiseNot(nodeToExpr(child))
- case Token("+", left :: right:: Nil) => Add(nodeToExpr(left), nodeToExpr(right))
- case Token("-", left :: right:: Nil) => Subtract(nodeToExpr(left), nodeToExpr(right))
- case Token("*", left :: right:: Nil) => Multiply(nodeToExpr(left), nodeToExpr(right))
- case Token("/", left :: right:: Nil) => Divide(nodeToExpr(left), nodeToExpr(right))
- case Token(DIV(), left :: right:: Nil) =>
- Cast(Divide(nodeToExpr(left), nodeToExpr(right)), LongType)
- case Token("%", left :: right:: Nil) => Remainder(nodeToExpr(left), nodeToExpr(right))
- case Token("&", left :: right:: Nil) => BitwiseAnd(nodeToExpr(left), nodeToExpr(right))
- case Token("|", left :: right:: Nil) => BitwiseOr(nodeToExpr(left), nodeToExpr(right))
- case Token("^", left :: right:: Nil) => BitwiseXor(nodeToExpr(left), nodeToExpr(right))
-
- /* Comparisons */
- case Token("=", left :: right:: Nil) => EqualTo(nodeToExpr(left), nodeToExpr(right))
- case Token("==", left :: right:: Nil) => EqualTo(nodeToExpr(left), nodeToExpr(right))
- case Token("<=>", left :: right:: Nil) => EqualNullSafe(nodeToExpr(left), nodeToExpr(right))
- case Token("!=", left :: right:: Nil) => Not(EqualTo(nodeToExpr(left), nodeToExpr(right)))
- case Token("<>", left :: right:: Nil) => Not(EqualTo(nodeToExpr(left), nodeToExpr(right)))
- case Token(">", left :: right:: Nil) => GreaterThan(nodeToExpr(left), nodeToExpr(right))
- case Token(">=", left :: right:: Nil) => GreaterThanOrEqual(nodeToExpr(left), nodeToExpr(right))
- case Token("<", left :: right:: Nil) => LessThan(nodeToExpr(left), nodeToExpr(right))
- case Token("<=", left :: right:: Nil) => LessThanOrEqual(nodeToExpr(left), nodeToExpr(right))
- case Token(LIKE(), left :: right:: Nil) => Like(nodeToExpr(left), nodeToExpr(right))
- case Token(RLIKE(), left :: right:: Nil) => RLike(nodeToExpr(left), nodeToExpr(right))
- case Token(REGEXP(), left :: right:: Nil) => RLike(nodeToExpr(left), nodeToExpr(right))
- case Token("TOK_FUNCTION", Token("TOK_ISNOTNULL", Nil) :: child :: Nil) =>
- IsNotNull(nodeToExpr(child))
- case Token("TOK_FUNCTION", Token("TOK_ISNULL", Nil) :: child :: Nil) =>
- IsNull(nodeToExpr(child))
- case Token("TOK_FUNCTION", Token(IN(), Nil) :: value :: list) =>
- In(nodeToExpr(value), list.map(nodeToExpr))
- case Token("TOK_FUNCTION",
- Token(BETWEEN(), Nil) ::
- kw ::
- target ::
- minValue ::
- maxValue :: Nil) =>
-
- val targetExpression = nodeToExpr(target)
- val betweenExpr =
- And(
- GreaterThanOrEqual(targetExpression, nodeToExpr(minValue)),
- LessThanOrEqual(targetExpression, nodeToExpr(maxValue)))
- kw match {
- case Token("KW_FALSE", Nil) => betweenExpr
- case Token("KW_TRUE", Nil) => Not(betweenExpr)
- }
+ val (inRowFormat, inSerdeClass, inSerdeProps, useDefaultRecordReader) =
+ matchSerDe(inputSerdeClause)
- /* Boolean Logic */
- case Token(AND(), left :: right:: Nil) => And(nodeToExpr(left), nodeToExpr(right))
- case Token(OR(), left :: right:: Nil) => Or(nodeToExpr(left), nodeToExpr(right))
- case Token(NOT(), child :: Nil) => Not(nodeToExpr(child))
- case Token("!", child :: Nil) => Not(nodeToExpr(child))
-
- /* Case statements */
- case Token("TOK_FUNCTION", Token(WHEN(), Nil) :: branches) =>
- CaseWhen(branches.map(nodeToExpr))
- case Token("TOK_FUNCTION", Token(CASE(), Nil) :: branches) =>
- val keyExpr = nodeToExpr(branches.head)
- CaseKeyWhen(keyExpr, branches.drop(1).map(nodeToExpr))
-
- /* Complex datatype manipulation */
- case Token("[", child :: ordinal :: Nil) =>
- UnresolvedExtractValue(nodeToExpr(child), nodeToExpr(ordinal))
-
- /* Window Functions */
- case Token(name, args :+ Token("TOK_WINDOWSPEC", spec)) =>
- val function = nodeToExpr(Token(name, args))
- nodesToWindowSpecification(spec) match {
- case reference: WindowSpecReference =>
- UnresolvedWindowExpression(function, reference)
- case definition: WindowSpecDefinition =>
- WindowExpression(function, definition)
- }
+ val (outRowFormat, outSerdeClass, outSerdeProps, useDefaultRecordWriter) =
+ matchSerDe(outputSerdeClause)
- /* UDFs - Must be last otherwise will preempt built in functions */
- case Token("TOK_FUNCTION", Token(name, Nil) :: args) =>
- UnresolvedFunction(name, args.map(nodeToExpr), isDistinct = false)
- // Aggregate function with DISTINCT keyword.
- case Token("TOK_FUNCTIONDI", Token(name, Nil) :: args) =>
- UnresolvedFunction(name, args.map(nodeToExpr), isDistinct = true)
- case Token("TOK_FUNCTIONSTAR", Token(name, Nil) :: args) =>
- UnresolvedFunction(name, UnresolvedStar(None) :: Nil, isDistinct = false)
-
- /* Literals */
- case Token("TOK_NULL", Nil) => Literal.create(null, NullType)
- case Token(TRUE(), Nil) => Literal.create(true, BooleanType)
- case Token(FALSE(), Nil) => Literal.create(false, BooleanType)
- case Token("TOK_STRINGLITERALSEQUENCE", strings) =>
- Literal(strings.map(s => SemanticAnalyzer.unescapeSQLString(s.getText)).mkString)
-
- // This code is adapted from
- // /ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java#L223
- case ast: ASTNode if numericAstTypes contains ast.getType =>
- var v: Literal = null
- try {
- if (ast.getText.endsWith("L")) {
- // Literal bigint.
- v = Literal.create(ast.getText.substring(0, ast.getText.length() - 1).toLong, LongType)
- } else if (ast.getText.endsWith("S")) {
- // Literal smallint.
- v = Literal.create(ast.getText.substring(0, ast.getText.length() - 1).toShort, ShortType)
- } else if (ast.getText.endsWith("Y")) {
- // Literal tinyint.
- v = Literal.create(ast.getText.substring(0, ast.getText.length() - 1).toByte, ByteType)
- } else if (ast.getText.endsWith("BD") || ast.getText.endsWith("D")) {
- // Literal decimal
- val strVal = ast.getText.stripSuffix("D").stripSuffix("B")
- v = Literal(Decimal(strVal))
- } else {
- v = Literal.create(ast.getText.toDouble, DoubleType)
- v = Literal.create(ast.getText.toLong, LongType)
- v = Literal.create(ast.getText.toInt, IntegerType)
- }
- } catch {
- case nfe: NumberFormatException => // Do nothing
- }
+ val unescapedScript = unescapeSQLString(script)
- if (v == null) {
- sys.error(s"Failed to parse number '${ast.getText}'.")
+ // TODO Adds support for user-defined record reader/writer classes
+ val recordReaderClass = if (useDefaultRecordReader) {
+ Option(hiveConf.getVar(ConfVars.HIVESCRIPTRECORDREADER))
} else {
- v
+ None
}
- case ast: ASTNode if ast.getType == SparkSqlParser.StringLiteral =>
- Literal(SemanticAnalyzer.unescapeSQLString(ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_DATELITERAL =>
- Literal(Date.valueOf(ast.getText.substring(1, ast.getText.length - 1)))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_CHARSETLITERAL =>
- Literal(SemanticAnalyzer.charSetString(ast.getChild(0).getText, ast.getChild(1).getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_YEAR_MONTH_LITERAL =>
- Literal(CalendarInterval.fromYearMonthString(ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_DAY_TIME_LITERAL =>
- Literal(CalendarInterval.fromDayTimeString(ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_YEAR_LITERAL =>
- Literal(CalendarInterval.fromSingleUnitString("year", ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_MONTH_LITERAL =>
- Literal(CalendarInterval.fromSingleUnitString("month", ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_DAY_LITERAL =>
- Literal(CalendarInterval.fromSingleUnitString("day", ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_HOUR_LITERAL =>
- Literal(CalendarInterval.fromSingleUnitString("hour", ast.getText))
-
- case ast: ASTNode if ast.getType == SparkSqlParser.TOK_INTERVAL_MINUTE_LITERAL =>
- Literal(CalendarInterval.fromSingleUnitString("minute", ast.getText))
-
- case ast: ASTNode if ast.g
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org