You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/22 00:59:40 UTC
spark git commit: [SPARK-14798][SQL] Move native command and script
transformation parsing into SparkSqlAstBuilder
Repository: spark
Updated Branches:
refs/heads/master ef6be7bed -> 1a95397bb
[SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder
## What changes were proposed in this pull request?
This patch moves native command and script transformation into SparkSqlAstBuilder. This builds on #12561. See the last commit for diff.
## How was this patch tested?
Updated test cases to reflect this.
Author: Reynold Xin <rx...@databricks.com>
Closes #12564 from rxin/SPARK-14798.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a95397b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a95397b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a95397b
Branch: refs/heads/master
Commit: 1a95397bb6a4e7e7a06ac450bf556fa3aa47b8cd
Parents: ef6be7b
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Apr 21 15:59:37 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 21 15:59:37 2016 -0700
----------------------------------------------------------------------
.../plans/logical/ScriptTransformation.scala | 64 ++++++++++++-
.../spark/sql/execution/SparkSqlParser.scala | 95 +++++++++++++++++++-
.../execution/command/HiveNativeCommand.scala | 35 ++++++++
.../spark/sql/internal/SessionState.scala | 2 +-
.../sql/execution/command/DDLCommandSuite.scala | 20 -----
.../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +-
.../spark/sql/hive/HiveQueryExecution.scala | 4 +-
.../apache/spark/sql/hive/HiveStrategies.scala | 5 +-
.../org/apache/spark/sql/hive/SQLBuilder.scala | 10 +--
.../sql/hive/execution/HiveNativeCommand.scala | 36 --------
.../sql/hive/execution/HiveSqlParser.scala | 84 +----------------
.../hive/execution/ScriptTransformation.scala | 67 ++++----------
.../apache/spark/sql/hive/test/TestHive.scala | 3 +-
.../spark/sql/hive/HiveDDLCommandSuite.scala | 3 +-
.../apache/spark/sql/hive/StatisticsSuite.scala | 1 +
.../sql/hive/execution/HiveComparisonTest.scala | 2 +-
16 files changed, 222 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
index 578027d..e176e9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -37,7 +37,65 @@ case class ScriptTransformation(
}
/**
- * A placeholder for implementation specific input and output properties when passing data
- * to a script. For example, in Hive this would specify which SerDes to use.
+ * Input and output properties when passing data to a script.
+ * For example, in Hive this would specify which SerDes to use.
*/
-trait ScriptInputOutputSchema
+case class ScriptInputOutputSchema(
+ inputRowFormat: Seq[(String, String)],
+ outputRowFormat: Seq[(String, String)],
+ inputSerdeClass: Option[String],
+ outputSerdeClass: Option[String],
+ inputSerdeProps: Seq[(String, String)],
+ outputSerdeProps: Seq[(String, String)],
+ recordReaderClass: Option[String],
+ recordWriterClass: Option[String],
+ schemaLess: Boolean) {
+
+ def inputRowFormatSQL: Option[String] =
+ getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+ def outputRowFormatSQL: Option[String] =
+ getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+ /**
+ * Get the row format specification
+ * Note:
+ * 1. Changes are needed when readerClause and writerClause are supported.
+ * 2. Changes are needed when "ESCAPED BY" is supported.
+ */
+ private def getRowFormatSQL(
+ rowFormat: Seq[(String, String)],
+ serdeClass: Option[String],
+ serdeProps: Seq[(String, String)]): Option[String] = {
+ if (schemaLess) return Some("")
+
+ val rowFormatDelimited =
+ rowFormat.map {
+ case ("TOK_TABLEROWFORMATFIELD", value) =>
+ "FIELDS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+ "COLLECTION ITEMS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+ "MAP KEYS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATLINES", value) =>
+ "LINES TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATNULL", value) =>
+ "NULL DEFINED AS " + value
+ case o => return None
+ }
+
+ val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+ val serdePropsSQL =
+ if (serdeClass.nonEmpty) {
+ val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+ if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+ } else {
+ ""
+ }
+ if (rowFormat.nonEmpty) {
+ Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+ } else {
+ Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ac12a72..05fb1ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import scala.collection.JavaConverters._
+import scala.util.Try
import org.antlr.v4.runtime.{ParserRuleContext, Token}
@@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
+
/**
* Concrete parser for Spark SQL statements.
*/
-class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
+class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
val astBuilder = new SparkSqlAstBuilder(conf)
+
+ private val substitutor = new VariableSubstitution(conf)
+
+ protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
+ super.parse(substitutor.substitute(command))(toResult)
+ }
+
+ protected override def nativeCommand(sqlText: String): LogicalPlan = {
+ HiveNativeCommand(substitutor.substitute(sqlText))
+ }
}
/**
@@ -45,6 +57,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
import org.apache.spark.sql.catalyst.parser.ParserUtils._
/**
+ * Pass a command to Hive using a [[HiveNativeCommand]].
+ */
+ override def visitExecuteNativeCommand(
+ ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
+ HiveNativeCommand(command(ctx))
+ }
+
+ /**
* Create a [[SetCommand]] logical plan.
*
* Note that we assume that everything after the SET keyword is assumed to be a part of the
@@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(col.STRING).map(string))
}
}
+
+ /**
+ * Create a [[ScriptInputOutputSchema]].
+ */
+ override protected def withScriptIOSchema(
+ ctx: QuerySpecificationContext,
+ inRowFormat: RowFormatContext,
+ recordWriter: Token,
+ outRowFormat: RowFormatContext,
+ recordReader: Token,
+ schemaLess: Boolean): ScriptInputOutputSchema = {
+ if (recordWriter != null || recordReader != null) {
+ throw new ParseException(
+ "Unsupported operation: Used defined record reader/writer classes.", ctx)
+ }
+
+ // Decode and input/output format.
+ type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
+ def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
+ case c: RowFormatDelimitedContext =>
+ // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
+ // expects a seq of pairs in which the old parsers' token names are used as keys.
+ // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
+ // retrieving the key value pairs ourselves.
+ def entry(key: String, value: Token): Seq[(String, String)] = {
+ Option(value).map(t => key -> t.getText).toSeq
+ }
+ val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
+ entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
+ entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
+
+ (entries, None, Seq.empty, None)
+
+ case c: RowFormatSerdeContext =>
+ // Use a serde format.
+ val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
+
+ // SPARK-10310: Special cases LazySimpleSerDe
+ val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
+ Try(conf.getConfString(configKey)).toOption
+ } else {
+ None
+ }
+ (Seq.empty, Option(name), props.toSeq, recordHandler)
+
+ case null =>
+ // Use default (serde) format.
+ val name = conf.getConfString("hive.script.serde",
+ "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+ val props = Seq("field.delim" -> "\t")
+ val recordHandler = Try(conf.getConfString(configKey)).toOption
+ (Nil, Option(name), props, recordHandler)
+ }
+
+ val (inFormat, inSerdeClass, inSerdeProps, reader) =
+ format(inRowFormat, "hive.script.recordreader")
+
+ val (outFormat, outSerdeClass, outSerdeProps, writer) =
+ format(outRowFormat, "hive.script.recordwriter")
+
+ ScriptInputOutputSchema(
+ inFormat, outFormat,
+ inSerdeClass, outSerdeClass,
+ inSerdeProps, outSerdeProps,
+ reader, writer,
+ schemaLess)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
new file mode 100644
index 0000000..39e441f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.types.StringType
+
+/**
+ * A command that we delegate to Hive. Eventually we should remove this.
+ */
+case class HiveNativeCommand(sql: String) extends RunnableCommand {
+
+ override def output: Seq[AttributeReference] =
+ Seq(AttributeReference("result", StringType, nullable = false)())
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.sessionState.runNativeSql(sql).map(Row(_))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8563dc3..e1be4b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -167,7 +167,7 @@ private[sql] class SessionState(ctx: SQLContext) {
}
def runNativeSql(sql: String): Seq[String] = {
- throw new UnsupportedOperationException
+ throw new AnalysisException("Unsupported query: " + sql)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index e99eb02..a1ffda9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}
- test("unsupported operations") {
- intercept[ParseException] {
- parser.parsePlan("DROP TABLE tab PURGE")
- }
- intercept[ParseException] {
- parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
- }
- intercept[ParseException] {
- parser.parsePlan(
- """
- |CREATE EXTERNAL TABLE oneToTenDef
- |USING org.apache.spark.sql.sources
- |OPTIONS (from '1', to '10')
- """.stripMargin)
- }
- intercept[ParseException] {
- parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
- }
- }
-
test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") {
val sql = "SELECT distribute, unset FROM x"
val parsed = parser.parsePlan(sql)
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8732285..ca39791 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
+import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
index 1c1bfb6..0ee34f0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
+import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
+import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index bbdcc8c..8720e54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
- ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
+ case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+ val hiveIoSchema = HiveScriptIOSchema(ioschema)
+ ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 2d44813..86115d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
/**
@@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}
private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
- val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
- val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+ val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.inputRowFormat}"))
- val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+ s"unsupported row format ${plan.ioschema.inputRowFormat}"))
+ val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
- s"unsupported row format ${ioSchema.outputRowFormat}"))
+ s"unsupported row format ${plan.ioschema.outputRowFormat}"))
val outputSchema = plan.output.map { attr =>
s"${attr.sql} ${attr.dataType.simpleString}"
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
deleted file mode 100644
index 8c1f4a8..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.HiveSessionState
-import org.apache.spark.sql.types.StringType
-
-private[hive]
-case class HiveNativeCommand(sql: String) extends RunnableCommand {
-
- override def output: Seq[AttributeReference] =
- Seq(AttributeReference("result", StringType, nullable = false)())
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 989da92..35530b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -17,16 +17,11 @@
package org.apache.spark.sql.hive.execution
-import scala.util.Try
-
-import org.antlr.v4.runtime.Token
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.HiveNativeCommand
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
/**
@@ -55,14 +50,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
import ParserUtils._
/**
- * Pass a command to Hive using a [[HiveNativeCommand]].
- */
- override def visitExecuteNativeCommand(
- ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
- HiveNativeCommand(command(ctx))
- }
-
- /**
* Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
* options are passed on to Hive) e.g.:
* {{{
@@ -80,73 +67,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
}
}
-
- /**
- * Create a [[HiveScriptIOSchema]].
- */
- override protected def withScriptIOSchema(
- ctx: QuerySpecificationContext,
- inRowFormat: RowFormatContext,
- recordWriter: Token,
- outRowFormat: RowFormatContext,
- recordReader: Token,
- schemaLess: Boolean): HiveScriptIOSchema = {
- if (recordWriter != null || recordReader != null) {
- throw new ParseException(
- "Unsupported operation: Used defined record reader/writer classes.", ctx)
- }
-
- // Decode and input/output format.
- type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
- def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
- case c: RowFormatDelimitedContext =>
- // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
- // expects a seq of pairs in which the old parsers' token names are used as keys.
- // Transforming the result of visitRowFormatDelimited would be quite a bit messier than
- // retrieving the key value pairs ourselves.
- def entry(key: String, value: Token): Seq[(String, String)] = {
- Option(value).map(t => key -> t.getText).toSeq
- }
- val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
- entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
- entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
- entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)
-
- (entries, None, Seq.empty, None)
-
- case c: RowFormatSerdeContext =>
- // Use a serde format.
- val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
-
- // SPARK-10310: Special cases LazySimpleSerDe
- val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
- Try(conf.getConfString(configKey)).toOption
- } else {
- None
- }
- (Seq.empty, Option(name), props.toSeq, recordHandler)
-
- case null =>
- // Use default (serde) format.
- val name = conf.getConfString("hive.script.serde",
- "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
- val props = Seq(serdeConstants.FIELD_DELIM -> "\t")
- val recordHandler = Try(conf.getConfString(configKey)).toOption
- (Nil, Option(name), props, recordHandler)
- }
-
- val (inFormat, inSerdeClass, inSerdeProps, reader) =
- format(inRowFormat, "hive.script.recordreader")
-
- val (outFormat, outSerdeClass, outSerdeProps, writer) =
- format(outRowFormat, "hive.script.recordwriter")
-
- HiveScriptIOSchema(
- inFormat, outFormat,
- inSerdeClass, outSerdeClass,
- inSerdeProps, outSerdeProps,
- reader, writer,
- schemaLess)
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 2f7cec3..8c8becf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread(
}
}
+private[hive]
+object HiveScriptIOSchema {
+ def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = {
+ HiveScriptIOSchema(
+ input.inputRowFormat,
+ input.outputRowFormat,
+ input.inputSerdeClass,
+ input.outputSerdeClass,
+ input.inputSerdeProps,
+ input.outputSerdeProps,
+ input.recordReaderClass,
+ input.recordWriterClass,
+ input.schemaLess)
+ }
+}
+
/**
* The wrapper class of Hive input and output schema properties
*/
@@ -325,7 +341,8 @@ case class HiveScriptIOSchema (
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
- schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors {
+ schemaLess: Boolean)
+ extends HiveInspectors {
private val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\t"),
@@ -402,52 +419,4 @@ case class HiveScriptIOSchema (
instance
}
}
-
- def inputRowFormatSQL: Option[String] =
- getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
-
- def outputRowFormatSQL: Option[String] =
- getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
-
- /**
- * Get the row format specification
- * Note:
- * 1. Changes are needed when readerClause and writerClause are supported.
- * 2. Changes are needed when "ESCAPED BY" is supported.
- */
- private def getRowFormatSQL(
- rowFormat: Seq[(String, String)],
- serdeClass: Option[String],
- serdeProps: Seq[(String, String)]): Option[String] = {
- if (schemaLess) return Some("")
-
- val rowFormatDelimited =
- rowFormat.map {
- case ("TOK_TABLEROWFORMATFIELD", value) =>
- "FIELDS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
- "COLLECTION ITEMS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
- "MAP KEYS TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATLINES", value) =>
- "LINES TERMINATED BY " + value
- case ("TOK_TABLEROWFORMATNULL", value) =>
- "NULL DEFINED AS " + value
- case o => return None
- }
-
- val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
- val serdePropsSQL =
- if (serdeClass.nonEmpty) {
- val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
- if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
- } else {
- ""
- }
- if (rowFormat.nonEmpty) {
- Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
- } else {
- Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
- }
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2bb1399..741e3bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.CacheTableCommand
+import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 4c90dbe..e352256 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand}
-import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.hive.test.TestHive
class HiveDDLCommandSuite extends PlanTest {
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e918704..7a6f1ce 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.HiveNativeCommand
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
http://git-wip-us.apache.org/repos/asf/spark/blob/1a95397b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 994dc4a..77906ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand}
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable}
import org.apache.spark.sql.hive.SQLBuilder
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org