You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/01/10 23:08:09 UTC
spark git commit: [SPARK-4861][SQL] Refactory command in spark sql
Repository: spark
Updated Branches:
refs/heads/master 693a323a7 -> b3e86dc62
[SPARK-4861][SQL] Refactory command in spark sql
Follow up for #3712.
This PR finally remove ```CommandStrategy``` and make all commands follow ```RunnableCommand``` so they can go with ```case r: RunnableCommand => ExecutedCommand(r) :: Nil```.
One exception is the ```DescribeCommand``` of hive, which is a special case and need to distinguish hive table and temporary table, so still keep ```HiveCommandStrategy``` here.
Author: scwf <wa...@huawei.com>
Closes #3948 from scwf/followup-SPARK-4861 and squashes the following commits:
6b48e64 [scwf] minor style fix
2c62e9d [scwf] fix for hive module
5a7a819 [scwf] Refactory command in spark sql
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3e86dc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3e86dc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3e86dc6
Branch: refs/heads/master
Commit: b3e86dc62476abb03b330f86a788aa19a6565317
Parents: 693a323
Author: scwf <wa...@huawei.com>
Authored: Sat Jan 10 14:08:04 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sat Jan 10 14:08:04 2015 -0800
----------------------------------------------------------------------
.../sql/catalyst/AbstractSparkSQLParser.scala | 107 +++++++++++
.../spark/sql/catalyst/SparkSQLParser.scala | 176 -------------------
.../sql/catalyst/plans/logical/commands.scala | 48 +----
.../scala/org/apache/spark/sql/SQLContext.scala | 3 +-
.../org/apache/spark/sql/SparkSQLParser.scala | 97 ++++++++++
.../spark/sql/execution/SparkStrategies.scala | 20 +--
.../apache/spark/sql/execution/commands.scala | 2 +-
.../spark/sql/hive/thriftserver/Shim12.scala | 4 +-
.../spark/sql/hive/thriftserver/Shim13.scala | 4 +-
.../org/apache/spark/sql/hive/HiveContext.scala | 4 +-
.../org/apache/spark/sql/hive/HiveQl.scala | 31 +++-
.../apache/spark/sql/hive/HiveStrategies.scala | 5 +-
.../org/apache/spark/sql/hive/TestHive.scala | 3 +-
.../sql/hive/execution/HiveComparisonTest.scala | 2 +
14 files changed, 248 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
new file mode 100644
index 0000000..93d74ad
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.lexical.StdLexical
+import scala.util.parsing.combinator.syntactical.StandardTokenParsers
+import scala.util.parsing.combinator.{PackratParsers, RegexParsers}
+import scala.util.parsing.input.CharArrayReader.EofCh
+
+import org.apache.spark.sql.catalyst.plans.logical._
+
+private[sql] abstract class AbstractSparkSQLParser
+ extends StandardTokenParsers with PackratParsers {
+
+ def apply(input: String): LogicalPlan = phrase(start)(new lexical.Scanner(input)) match {
+ case Success(plan, _) => plan
+ case failureOrError => sys.error(failureOrError.toString)
+ }
+
+ protected case class Keyword(str: String)
+
+ protected def start: Parser[LogicalPlan]
+
+ // Returns the whole input string
+ protected lazy val wholeInput: Parser[String] = new Parser[String] {
+ def apply(in: Input): ParseResult[String] =
+ Success(in.source.toString, in.drop(in.source.length()))
+ }
+
+ // Returns the rest of the input string that are not parsed yet
+ protected lazy val restInput: Parser[String] = new Parser[String] {
+ def apply(in: Input): ParseResult[String] =
+ Success(
+ in.source.subSequence(in.offset, in.source.length()).toString,
+ in.drop(in.source.length()))
+ }
+}
+
+class SqlLexical(val keywords: Seq[String]) extends StdLexical {
+ case class FloatLit(chars: String) extends Token {
+ override def toString = chars
+ }
+
+ reserved ++= keywords.flatMap(w => allCaseVersions(w))
+
+ delimiters += (
+ "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
+ ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
+ )
+
+ override lazy val token: Parser[Token] =
+ ( identChar ~ (identChar | digit).* ^^
+ { case first ~ rest => processIdent((first :: rest).mkString) }
+ | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
+ case i ~ None => NumericLit(i.mkString)
+ case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+ }
+ | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
+ { case chars => StringLit(chars mkString "") }
+ | '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
+ { case chars => StringLit(chars mkString "") }
+ | '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
+ { case chars => Identifier(chars mkString "") }
+ | EofCh ^^^ EOF
+ | '\'' ~> failure("unclosed string literal")
+ | '"' ~> failure("unclosed string literal")
+ | delim
+ | failure("illegal character")
+ )
+
+ override def identChar = letter | elem('_')
+
+ override def whitespace: Parser[Any] =
+ ( whitespaceChar
+ | '/' ~ '*' ~ comment
+ | '/' ~ '/' ~ chrExcept(EofCh, '\n').*
+ | '#' ~ chrExcept(EofCh, '\n').*
+ | '-' ~ '-' ~ chrExcept(EofCh, '\n').*
+ | '/' ~ '*' ~ failure("unclosed comment")
+ ).*
+
+ /** Generate all variations of upper and lower case of a given string */
+ def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
+ if (s.isEmpty) {
+ Stream(prefix)
+ } else {
+ allCaseVersions(s.tail, prefix + s.head.toLower) #:::
+ allCaseVersions(s.tail, prefix + s.head.toUpper)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
deleted file mode 100644
index f1a1ca6..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.lexical.StdLexical
-import scala.util.parsing.combinator.syntactical.StandardTokenParsers
-import scala.util.parsing.combinator.{PackratParsers, RegexParsers}
-import scala.util.parsing.input.CharArrayReader.EofCh
-
-import org.apache.spark.sql.catalyst.plans.logical._
-
-private[sql] abstract class AbstractSparkSQLParser
- extends StandardTokenParsers with PackratParsers {
-
- def apply(input: String): LogicalPlan = phrase(start)(new lexical.Scanner(input)) match {
- case Success(plan, _) => plan
- case failureOrError => sys.error(failureOrError.toString)
- }
-
- protected case class Keyword(str: String)
-
- protected def start: Parser[LogicalPlan]
-
- // Returns the whole input string
- protected lazy val wholeInput: Parser[String] = new Parser[String] {
- def apply(in: Input): ParseResult[String] =
- Success(in.source.toString, in.drop(in.source.length()))
- }
-
- // Returns the rest of the input string that are not parsed yet
- protected lazy val restInput: Parser[String] = new Parser[String] {
- def apply(in: Input): ParseResult[String] =
- Success(
- in.source.subSequence(in.offset, in.source.length()).toString,
- in.drop(in.source.length()))
- }
-}
-
-class SqlLexical(val keywords: Seq[String]) extends StdLexical {
- case class FloatLit(chars: String) extends Token {
- override def toString = chars
- }
-
- reserved ++= keywords.flatMap(w => allCaseVersions(w))
-
- delimiters += (
- "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
- ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
- )
-
- override lazy val token: Parser[Token] =
- ( identChar ~ (identChar | digit).* ^^
- { case first ~ rest => processIdent((first :: rest).mkString) }
- | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
- case i ~ None => NumericLit(i.mkString)
- case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
- }
- | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
- { case chars => StringLit(chars mkString "") }
- | '"' ~> chrExcept('"', '\n', EofCh).* <~ '"' ^^
- { case chars => StringLit(chars mkString "") }
- | '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^
- { case chars => Identifier(chars mkString "") }
- | EofCh ^^^ EOF
- | '\'' ~> failure("unclosed string literal")
- | '"' ~> failure("unclosed string literal")
- | delim
- | failure("illegal character")
- )
-
- override def identChar = letter | elem('_')
-
- override def whitespace: Parser[Any] =
- ( whitespaceChar
- | '/' ~ '*' ~ comment
- | '/' ~ '/' ~ chrExcept(EofCh, '\n').*
- | '#' ~ chrExcept(EofCh, '\n').*
- | '-' ~ '-' ~ chrExcept(EofCh, '\n').*
- | '/' ~ '*' ~ failure("unclosed comment")
- ).*
-
- /** Generate all variations of upper and lower case of a given string */
- def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
- if (s.isEmpty) {
- Stream(prefix)
- } else {
- allCaseVersions(s.tail, prefix + s.head.toLower) #:::
- allCaseVersions(s.tail, prefix + s.head.toUpper)
- }
- }
-}
-
-/**
- * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL
- * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser.
- *
- * @param fallback A function that parses an input string to a logical plan
- */
-private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
-
- // A parser for the key-value part of the "SET [key = [value ]]" syntax
- private object SetCommandParser extends RegexParsers {
- private val key: Parser[String] = "(?m)[^=]+".r
-
- private val value: Parser[String] = "(?m).*$".r
-
- private val pair: Parser[LogicalPlan] =
- (key ~ ("=".r ~> value).?).? ^^ {
- case None => SetCommand(None)
- case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
- }
-
- def apply(input: String): LogicalPlan = parseAll(pair, input) match {
- case Success(plan, _) => plan
- case x => sys.error(x.toString)
- }
- }
-
- protected val AS = Keyword("AS")
- protected val CACHE = Keyword("CACHE")
- protected val LAZY = Keyword("LAZY")
- protected val SET = Keyword("SET")
- protected val TABLE = Keyword("TABLE")
- protected val UNCACHE = Keyword("UNCACHE")
-
- protected implicit def asParser(k: Keyword): Parser[String] =
- lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
-
- private val reservedWords: Seq[String] =
- this
- .getClass
- .getMethods
- .filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword].str)
-
- override val lexical = new SqlLexical(reservedWords)
-
- override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
-
- private lazy val cache: Parser[LogicalPlan] =
- CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
- case isLazy ~ tableName ~ plan =>
- CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
- }
-
- private lazy val uncache: Parser[LogicalPlan] =
- UNCACHE ~ TABLE ~> ident ^^ {
- case tableName => UncacheTableCommand(tableName)
- }
-
- private lazy val set: Parser[LogicalPlan] =
- SET ~> restInput ^^ {
- case input => SetCommandParser(input)
- }
-
- private lazy val others: Parser[LogicalPlan] =
- wholeInput ^^ {
- case input => fallback(input)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 5a18639..45905f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.types.StringType
+import org.apache.spark.sql.catalyst.expressions.Attribute
/**
* A logical node that represents a non-query command to be executed by the system. For example,
@@ -28,48 +27,3 @@ abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
}
-
-/**
- *
- * Commands of the form "SET [key [= value] ]".
- */
-case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
- override def output = Seq(
- AttributeReference("", StringType, nullable = false)())
-}
-
-/**
- * Returned by a parser when the users only wants to see what query plan would be executed, without
- * actually performing the execution.
- */
-case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends Command {
- override def output =
- Seq(AttributeReference("plan", StringType, nullable = false)())
-}
-
-/**
- * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command.
- */
-case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)
- extends Command
-
-/**
- * Returned for the "UNCACHE TABLE tableName" command.
- */
-case class UncacheTableCommand(tableName: String) extends Command
-
-/**
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- * It is effective only when the table is a Hive table.
- */
-case class DescribeCommand(
- table: LogicalPlan,
- isExtended: Boolean) extends Command {
- override def output = Seq(
- // Column names are based on Hive.
- AttributeReference("col_name", StringType, nullable = false)(),
- AttributeReference("data_type", StringType, nullable = false)(),
- AttributeReference("comment", StringType, nullable = false)())
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9962937..6c575dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -76,7 +76,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] val sqlParser = {
val fallback = new catalyst.SqlParser
- new catalyst.SparkSQLParser(fallback(_))
+ new SparkSQLParser(fallback(_))
}
protected[sql] def parseSql(sql: String): LogicalPlan = {
@@ -329,7 +329,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
- CommandStrategy ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
new file mode 100644
index 0000000..65358b7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.{SqlLexical, AbstractSparkSQLParser}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand}
+
+import scala.util.parsing.combinator.RegexParsers
+
+/**
+ * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL
+ * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser.
+ *
+ * @param fallback A function that parses an input string to a logical plan
+ */
+private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
+
+ // A parser for the key-value part of the "SET [key = [value ]]" syntax
+ private object SetCommandParser extends RegexParsers {
+ private val key: Parser[String] = "(?m)[^=]+".r
+
+ private val value: Parser[String] = "(?m).*$".r
+
+ private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)())
+
+ private val pair: Parser[LogicalPlan] =
+ (key ~ ("=".r ~> value).?).? ^^ {
+ case None => SetCommand(None, output)
+ case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+ }
+
+ def apply(input: String): LogicalPlan = parseAll(pair, input) match {
+ case Success(plan, _) => plan
+ case x => sys.error(x.toString)
+ }
+ }
+
+ protected val AS = Keyword("AS")
+ protected val CACHE = Keyword("CACHE")
+ protected val LAZY = Keyword("LAZY")
+ protected val SET = Keyword("SET")
+ protected val TABLE = Keyword("TABLE")
+ protected val UNCACHE = Keyword("UNCACHE")
+
+ protected implicit def asParser(k: Keyword): Parser[String] =
+ lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
+
+ private val reservedWords: Seq[String] =
+ this
+ .getClass
+ .getMethods
+ .filter(_.getReturnType == classOf[Keyword])
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+ override val lexical = new SqlLexical(reservedWords)
+
+ override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
+
+ private lazy val cache: Parser[LogicalPlan] =
+ CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
+ case isLazy ~ tableName ~ plan =>
+ CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
+ }
+
+ private lazy val uncache: Parser[LogicalPlan] =
+ UNCACHE ~ TABLE ~> ident ^^ {
+ case tableName => UncacheTableCommand(tableName)
+ }
+
+ private lazy val set: Parser[LogicalPlan] =
+ SET ~> restInput ^^ {
+ case input => SetCommandParser(input)
+ }
+
+ private lazy val others: Parser[LogicalPlan] =
+ wholeInput ^^ {
+ case input => fallback(input)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce878c1..99b6611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -259,6 +259,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def numPartitions = self.numPartitions
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case r: RunnableCommand => ExecutedCommand(r) :: Nil
+
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
@@ -308,22 +310,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}
-
- case object CommandStrategy extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: RunnableCommand => ExecutedCommand(r) :: Nil
- case logical.SetCommand(kv) =>
- Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
- case logical.ExplainCommand(logicalPlan, extended) =>
- Seq(ExecutedCommand(
- execution.ExplainCommand(logicalPlan, plan.output, extended)))
- case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
- Seq(ExecutedCommand(
- execution.CacheTableCommand(tableName, optPlan, isLazy)))
- case logical.UncacheTableCommand(tableName) =>
- Seq(ExecutedCommand(
- execution.UncacheTableCommand(tableName)))
- case _ => Nil
- }
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index b8fa4b0..0d765c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -113,7 +113,7 @@ case class SetCommand(
@DeveloperApi
case class ExplainCommand(
logicalPlan: LogicalPlan,
- override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
+ override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
// Run through the optimizer to generate the physical plan.
override def run(sqlContext: SQLContext) = try {
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 5550183..80733ea 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -33,8 +33,8 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
@@ -190,7 +190,7 @@ private[hive] class SparkExecuteStatementOperation(
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
- case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+ case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 798a690..19d8514 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -31,7 +31,7 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
+import org.apache.spark.sql.execution.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
@@ -161,7 +161,7 @@ private[hive] class SparkExecuteStatementOperation(
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
- case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+ case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
sessionToActivePool(parentSession.getSessionHandle) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1648fa8..02eac43 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.DecimalType
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
import org.apache.spark.sql.sources.DataSourceStrategy
@@ -340,7 +339,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
- CommandStrategy,
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index c2ab357..28de03c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -24,8 +24,8 @@ import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
+import org.apache.spark.sql.SparkSQLParser
-import org.apache.spark.sql.catalyst.SparkSQLParser
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}
/* Implicit conversions */
@@ -45,6 +46,22 @@ import scala.collection.JavaConversions._
*/
private[hive] case object NativePlaceholder extends Command
+/**
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ * It is effective only when the table is a Hive table.
+ */
+case class DescribeCommand(
+ table: LogicalPlan,
+ isExtended: Boolean) extends Command {
+ override def output = Seq(
+ // Column names are based on Hive.
+ AttributeReference("col_name", StringType, nullable = false)(),
+ AttributeReference("data_type", StringType, nullable = false)(),
+ AttributeReference("comment", StringType, nullable = false)())
+}
+
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
@@ -457,17 +474,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
- ExplainCommand(NoRelation)
+ ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)()))
case Token("TOK_EXPLAIN", explainArgs)
if "TOK_CREATETABLE" == explainArgs.head.getText =>
val Some(crtTbl) :: _ :: extended :: Nil =
getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
- ExplainCommand(nodeToPlan(crtTbl), extended != None)
+ ExplainCommand(
+ nodeToPlan(crtTbl),
+ Seq(AttributeReference("plan", StringType,nullable = false)()),
+ extended != None)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
val Some(query) :: _ :: extended :: Nil =
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
- ExplainCommand(nodeToPlan(query), extended != None)
+ ExplainCommand(
+ nodeToPlan(query),
+ Seq(AttributeReference("plan", StringType, nullable = false)()),
+ extended != None)
case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d3f6381..c439b9e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.StringType
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
@@ -209,14 +210,14 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case describe: logical.DescribeCommand =>
+ case describe: DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
ExecutedCommand(
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
case o: LogicalPlan =>
- ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil
+ ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil
}
case _ => Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 8f2311c..1358a0e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -34,8 +34,9 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.hive.execution.HiveNativeCommand
http://git-wip-us.apache.org/repos/asf/spark/blob/b3e86dc6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 4104df8..f8a957d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,6 +22,8 @@ import java.io._
import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
import org.apache.spark.Logging
+import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
+import org.apache.spark.sql.hive.DescribeCommand
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org